From fabda095ca751e5c634119374a0fd81d97ad0ffe Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Wed, 22 Jan 2025 14:32:43 -0800 Subject: [PATCH 01/10] remove cdc entities/storages/datasets --- .../configuration/groupassignee/dataset.yaml | 6 -- .../groupassignee/entities/groupassignee.yaml | 33 ------- .../storages/group_assignees.yaml | 67 --------------- .../configuration/groupedmessage/dataset.yaml | 6 -- .../entities/groupedmessage.yaml | 35 -------- .../storages/grouped_messages.yaml | 86 ------------------- 6 files changed, 233 deletions(-) delete mode 100644 snuba/datasets/configuration/groupassignee/dataset.yaml delete mode 100644 snuba/datasets/configuration/groupassignee/entities/groupassignee.yaml delete mode 100644 snuba/datasets/configuration/groupassignee/storages/group_assignees.yaml delete mode 100644 snuba/datasets/configuration/groupedmessage/dataset.yaml delete mode 100644 snuba/datasets/configuration/groupedmessage/entities/groupedmessage.yaml delete mode 100644 snuba/datasets/configuration/groupedmessage/storages/grouped_messages.yaml diff --git a/snuba/datasets/configuration/groupassignee/dataset.yaml b/snuba/datasets/configuration/groupassignee/dataset.yaml deleted file mode 100644 index a1164a30ad7..00000000000 --- a/snuba/datasets/configuration/groupassignee/dataset.yaml +++ /dev/null @@ -1,6 +0,0 @@ -version: v1 -kind: dataset -name: groupassignee - -entities: - - groupassignee diff --git a/snuba/datasets/configuration/groupassignee/entities/groupassignee.yaml b/snuba/datasets/configuration/groupassignee/entities/groupassignee.yaml deleted file mode 100644 index 6222a9c2443..00000000000 --- a/snuba/datasets/configuration/groupassignee/entities/groupassignee.yaml +++ /dev/null @@ -1,33 +0,0 @@ -version: v1 -kind: entity -name: groupassignee - -schema: - [ - { name: offset, type: UInt, args: { size: 64 } }, - { name: record_deleted, type: UInt, args: { size: 8 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: group_id, type: UInt, args: { size: 64 } }, - { name: date_added, type: DateTime, args: { schema_modifiers: [nullable] } }, - { name: user_id, type: UInt, args: { size: 64, schema_modifiers: [nullable] } }, - { name: team_id, type: UInt, args: { size: 64, schema_modifiers: [nullable] } }, - ] - -storages: - - storage: groupassignees - is_writable: true -storage_selector: - selector: DefaultQueryStorageSelector - -query_processors: - - processor: BasicFunctionsProcessor -validate_data_model: error -validators: [] -required_time_column: null -join_relationships: - owns: - rhs_entity: events - join_type: left - columns: - - [project_id, project_id] - - [group_id, group_id] diff --git a/snuba/datasets/configuration/groupassignee/storages/group_assignees.yaml b/snuba/datasets/configuration/groupassignee/storages/group_assignees.yaml deleted file mode 100644 index e12d9d3f4ab..00000000000 --- a/snuba/datasets/configuration/groupassignee/storages/group_assignees.yaml +++ /dev/null @@ -1,67 +0,0 @@ -version: v1 -kind: cdc_storage -name: groupassignees -storage: - key: groupassignees - set_key: cdc -readiness_state: deprecate -schema: - columns: - [ - { name: offset, type: UInt, args: { size: 64 } }, - { name: record_deleted, type: UInt, args: { size: 8 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: group_id, type: UInt, args: { size: 64 } }, - { - name: date_added, - type: DateTime, - args: { schema_modifiers: [nullable] }, - }, - { - name: user_id, - type: UInt, - args: { schema_modifiers: [nullable], size: 64 }, - }, - { - name: team_id, - type: UInt, - args: { schema_modifiers: [nullable], size: 64 }, - }, - ] - local_table_name: groupassignee_local - dist_table_name: groupassignee_dist -default_control_topic: cdc_control -postgres_table: sentry_groupasignee -row_processor: - processor: GroupAssigneeRowProcessor -allocation_policies: - - name: ConcurrentRateLimitAllocationPolicy - args: - required_tenant_types: - - organization_id - - referrer - - project_id - default_config_overrides: - is_enforced: 0 - - name: BytesScannedWindowAllocationPolicy - args: - required_tenant_types: - - organization_id - - referrer - default_config_overrides: - is_enforced: 1 - throttled_thread_number: 1 - org_limit_bytes_scanned: 100000 -query_processors: - - processor: PrewhereProcessor - args: - prewhere_candidates: - - project_id - - processor: ConsistencyEnforcerProcessor -stream_loader: - processor: GroupAssigneeProcessor - default_topic: cdc - pre_filter: - type: CdcTableNameMessageFilter - args: - postgres_table: sentry_groupasignee diff --git a/snuba/datasets/configuration/groupedmessage/dataset.yaml b/snuba/datasets/configuration/groupedmessage/dataset.yaml deleted file mode 100644 index 4f49abe045b..00000000000 --- a/snuba/datasets/configuration/groupedmessage/dataset.yaml +++ /dev/null @@ -1,6 +0,0 @@ -version: v1 -kind: dataset -name: groupedmessage - -entities: - - groupedmessage diff --git a/snuba/datasets/configuration/groupedmessage/entities/groupedmessage.yaml b/snuba/datasets/configuration/groupedmessage/entities/groupedmessage.yaml deleted file mode 100644 index ae01cafd5e7..00000000000 --- a/snuba/datasets/configuration/groupedmessage/entities/groupedmessage.yaml +++ /dev/null @@ -1,35 +0,0 @@ -version: v1 -kind: entity -name: groupedmessage - -schema: - [ - { name: offset, type: UInt, args: { size: 64 } }, - { name: record_deleted, type: UInt, args: { size: 8 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: id, type: UInt, args: { size: 64 } }, - { name: status, type: UInt, args: { schema_modifiers: [nullable], size: 8 } }, - { name: last_seen, type: DateTime, args: { schema_modifiers: [nullable] } }, - { name: first_seen, type: DateTime, args: { schema_modifiers: [nullable] } }, - { name: active_at, type: DateTime, args: { schema_modifiers: [nullable] } }, - { name: first_release_id, type: UInt, args: { schema_modifiers: [nullable], size: 64 } }, - ] - -storages: - - storage: groupedmessages - is_writable: true -storage_selector: - selector: DefaultQueryStorageSelector - -query_processors: - - processor: BasicFunctionsProcessor -validate_data_model: error -validators: [] -required_time_column: null -join_relationships: - groups: - rhs_entity: events - join_type: left - columns: - - [project_id, project_id] - - [id, group_id] diff --git a/snuba/datasets/configuration/groupedmessage/storages/grouped_messages.yaml b/snuba/datasets/configuration/groupedmessage/storages/grouped_messages.yaml deleted file mode 100644 index 8c1f77b00aa..00000000000 --- a/snuba/datasets/configuration/groupedmessage/storages/grouped_messages.yaml +++ /dev/null @@ -1,86 +0,0 @@ -version: v1 -kind: cdc_storage -name: groupedmessages -storage: - key: groupedmessages - set_key: cdc -readiness_state: deprecate -schema: - columns: - [ - { name: offset, type: UInt, args: { size: 64 } }, - { name: record_deleted, type: UInt, args: { size: 8 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: id, type: UInt, args: { size: 64 } }, - { - name: status, - type: UInt, - args: { schema_modifiers: [nullable], size: 8 }, - }, - { - name: last_seen, - type: DateTime, - args: { schema_modifiers: [nullable] }, - }, - { - name: first_seen, - type: DateTime, - args: { schema_modifiers: [nullable] }, - }, - { - name: active_at, - type: DateTime, - args: { schema_modifiers: [nullable] }, - }, - { - name: first_release_id, - type: UInt, - args: { schema_modifiers: [nullable], size: 64 }, - }, - ] - local_table_name: groupedmessage_local - dist_table_name: groupedmessage_dist - not_deleted_mandatory_condition: record_deleted -default_control_topic: cdc_control -postgres_table: sentry_groupedmessage -row_processor: - processor: GroupedMessageRowProcessor -allocation_policies: - - name: ConcurrentRateLimitAllocationPolicy - args: - required_tenant_types: - - organization_id - - referrer - - project_id - default_config_overrides: - is_enforced: 0 - - name: BytesScannedWindowAllocationPolicy - args: - required_tenant_types: - - organization_id - - referrer - default_config_overrides: - is_enforced: 1 - throttled_thread_number: 1 - org_limit_bytes_scanned: 100000 - - name: ReferrerGuardRailPolicy - args: - required_tenant_types: - - referrer - default_config_overrides: - is_enforced: 0 - is_active: 0 -query_processors: - - processor: PrewhereProcessor - args: - prewhere_candidates: - - project_id - - id - - processor: ConsistencyEnforcerProcessor -stream_loader: - processor: GroupedMessageProcessor - default_topic: cdc - pre_filter: - type: CdcTableNameMessageFilter - args: - postgres_table: sentry_groupedmessage From 5c80951a1b9243257f1d3d60fefa809df4643f7e Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Wed, 22 Jan 2025 14:33:32 -0800 Subject: [PATCH 02/10] remove CDC stuff --- snuba/settings/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index c952b57c15c..e73a18ad8c7 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -97,7 +97,6 @@ "ca_certs": os.environ.get("CLICKHOUSE_CA_CERTS"), "verify": os.environ.get("CLICKHOUSE_VERIFY"), "storage_sets": { - "cdc", "discover", "events", "events_ro", From 6507cb422910af31a7bb49f50c2856641ba70c19 Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Wed, 22 Jan 2025 14:49:11 -0800 Subject: [PATCH 03/10] remove cdc tests --- tests/datasets/cdc/__init__.py | 0 tests/datasets/cdc/test_groupassignee.py | 238 ---------------- tests/datasets/cdc/test_groupedmessage.py | 301 --------------------- tests/datasets/cdc/test_message_filters.py | 47 ---- 4 files changed, 586 deletions(-) delete mode 100644 tests/datasets/cdc/__init__.py delete mode 100644 tests/datasets/cdc/test_groupassignee.py delete mode 100644 tests/datasets/cdc/test_groupedmessage.py delete mode 100644 tests/datasets/cdc/test_message_filters.py diff --git a/tests/datasets/cdc/__init__.py b/tests/datasets/cdc/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/tests/datasets/cdc/test_groupassignee.py b/tests/datasets/cdc/test_groupassignee.py deleted file mode 100644 index 68422c36a4a..00000000000 --- a/tests/datasets/cdc/test_groupassignee.py +++ /dev/null @@ -1,238 +0,0 @@ -from datetime import datetime, timezone - -import pytest - -from snuba.clusters.cluster import ClickhouseClientSettings -from snuba.consumers.types import KafkaMessageMetadata -from snuba.datasets.cdc.groupassignee_processor import ( - GroupAssigneeProcessor, - GroupAssigneeRow, -) -from snuba.datasets.cdc.types import DeleteEvent, InsertEvent, UpdateEvent -from snuba.datasets.storages.factory import get_writable_storage -from snuba.datasets.storages.storage_key import StorageKey -from snuba.processor import InsertBatch -from tests.helpers import write_processed_messages - - -@pytest.mark.clickhouse_db -@pytest.mark.redis_db -class TestGroupassignee: - storage = get_writable_storage(StorageKey.GROUPASSIGNEES) - - UPDATE_MSG_NO_KEY_CHANGE = UpdateEvent( - { - "columnnames": [ - "id", - "project_id", - "group_id", - "user_id", - "date_added", - "team_id", - ], - "columntypes": [ - "bigint", - "bigint", - "bigint", - "integer", - "timestamp with time zone", - "bigint", - ], - "columnvalues": [35, 2, 1359, 1, "2019-09-19 00:17:55+00", None], - "event": "change", - "kind": "update", - "oldkeys": { - "keynames": ["project_id", "group_id"], - "keytypes": ["bigint", "bigint"], - "keyvalues": [2, 1359], - }, - "schema": "public", - "table": "sentry_groupasignee", - "timestamp": "2019-09-19 00:06:56.376853+00", - "xid": 3803891, - } - ) - UPDATE_MSG_WITH_KEY_CHANGE = UpdateEvent( - { - "columnnames": [ - "id", - "project_id", - "group_id", - "user_id", - "date_added", - "team_id", - ], - "columntypes": [ - "bigint", - "bigint", - "bigint", - "integer", - "timestamp with time zone", - "bigint", - ], - "columnvalues": [35, 3, 1359, 1, "2019-09-19 00:17:55+00", None], - "event": "change", - "kind": "update", - "oldkeys": { - "keynames": ["project_id", "group_id"], - "keytypes": ["bigint", "bigint"], - "keyvalues": [2, 1359], - }, - "schema": "public", - "table": "sentry_groupasignee", - "timestamp": "2019-09-19 00:06:56.376853+00", - "xid": 3803891, - } - ) - - DELETE_MSG = DeleteEvent( - { - "event": "change", - "kind": "delete", - "oldkeys": { - "keynames": ["project_id", "group_id"], - "keytypes": ["bigint", "bigint"], - "keyvalues": [2, 1359], - }, - "schema": "public", - "table": "sentry_groupasignee", - "timestamp": "2019-09-19 00:17:21.44787+00", - "xid": 3803954, - } - ) - - INSERT_MSG = InsertEvent( - { - "columnnames": [ - "id", - "project_id", - "group_id", - "user_id", - "date_added", - "team_id", - ], - "columntypes": [ - "bigint", - "bigint", - "bigint", - "integer", - "timestamp with time zone", - "bigint", - ], - "columnvalues": [35, 2, 1359, 1, "2019-09-19 00:17:55+00", None], - "event": "change", - "kind": "insert", - "schema": "public", - "table": "sentry_groupasignee", - "timestamp": "2019-09-19 00:17:55.032443+00", - "xid": 3803982, - } - ) - - PROCESSED = { - "offset": 42, - "project_id": 2, - "group_id": 1359, - "record_deleted": 0, - "user_id": 1, - "team_id": None, - "date_added": datetime(2019, 9, 19, 0, 17, 55, tzinfo=timezone.utc), - } - - PROCESSED_UPDATE = { - "offset": 42, - "project_id": 3, - "group_id": 1359, - "record_deleted": 0, - "user_id": 1, - "team_id": None, - "date_added": datetime(2019, 9, 19, 0, 17, 55, tzinfo=timezone.utc), - } - - DELETED = { - "offset": 42, - "project_id": 2, - "group_id": 1359, - "record_deleted": 1, - "user_id": None, - "team_id": None, - "date_added": None, - } - - def test_messages(self) -> None: - processor = GroupAssigneeProcessor() - - metadata = KafkaMessageMetadata( - offset=42, partition=0, timestamp=datetime(1970, 1, 1) - ) - - ret = processor.process_message(self.INSERT_MSG, metadata) - assert ret == InsertBatch( - [self.PROCESSED], - datetime(2019, 9, 19, 0, 17, 55, 32443, tzinfo=timezone.utc), - ) - write_processed_messages(self.storage, [ret]) - results = ( - self.storage.get_cluster() - .get_query_connection(ClickhouseClientSettings.QUERY) - .execute("SELECT * FROM groupassignee_local;") - .results - ) - assert results[0] == ( - 42, # offset - 0, # deleted - 2, # project_id - 1359, # group_id - datetime(2019, 9, 19, 0, 17, 55), - 1, # user_id - None, # team_id - ) - - ret = processor.process_message(self.UPDATE_MSG_NO_KEY_CHANGE, metadata) - assert ret == InsertBatch( - [self.PROCESSED], - datetime(2019, 9, 19, 0, 6, 56, 376853, tzinfo=timezone.utc), - ) - - # Tests an update with key change which becomes a two inserts: - # one deletion and the insertion of the new row. - ret = processor.process_message(self.UPDATE_MSG_WITH_KEY_CHANGE, metadata) - assert ret == InsertBatch( - [self.DELETED, self.PROCESSED_UPDATE], - datetime(2019, 9, 19, 0, 6, 56, 376853, tzinfo=timezone.utc), - ) - - ret = processor.process_message(self.DELETE_MSG, metadata) - assert ret == InsertBatch( - [self.DELETED], - datetime(2019, 9, 19, 0, 17, 21, 447870, tzinfo=timezone.utc), - ) - - def test_bulk_load(self) -> None: - row = GroupAssigneeRow.from_bulk( - { - "project_id": "2", - "group_id": "1359", - "date_added": "2019-09-19 00:17:55+00", - "user_id": "1", - "team_id": "", - } - ) - write_processed_messages( - self.storage, [InsertBatch([row.to_clickhouse()], None)] - ) - ret = ( - self.storage.get_cluster() - .get_query_connection(ClickhouseClientSettings.QUERY) - .execute("SELECT * FROM groupassignee_local;") - .results - ) - assert ret[0] == ( - 0, # offset - 0, # deleted - 2, # project_id - 1359, # group_id - datetime(2019, 9, 19, 0, 17, 55), - 1, # user_id - None, # team_id - ) diff --git a/tests/datasets/cdc/test_groupedmessage.py b/tests/datasets/cdc/test_groupedmessage.py deleted file mode 100644 index 97c16052feb..00000000000 --- a/tests/datasets/cdc/test_groupedmessage.py +++ /dev/null @@ -1,301 +0,0 @@ -from datetime import datetime, timezone - -import pytest - -from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster -from snuba.clusters.storage_sets import StorageSetKey -from snuba.consumers.types import KafkaMessageMetadata -from snuba.datasets.cdc.groupedmessage_processor import ( - GroupedMessageProcessor, - GroupedMessageRow, -) -from snuba.datasets.cdc.types import DeleteEvent, InsertEvent, UpdateEvent -from snuba.datasets.storages.factory import get_writable_storage -from snuba.datasets.storages.storage_key import StorageKey -from snuba.processor import InsertBatch -from tests.helpers import write_processed_messages - - -@pytest.mark.clickhouse_db -@pytest.mark.redis_db -class TestGroupedMessage: - storage = get_writable_storage(StorageKey.GROUPEDMESSAGES) - - UPDATE_MSG = UpdateEvent( - { - "columnnames": [ - "id", - "logger", - "level", - "message", - "view", - "status", - "times_seen", - "last_seen", - "first_seen", - "data", - "score", - "project_id", - "time_spent_total", - "time_spent_count", - "resolved_at", - "active_at", - "is_public", - "platform", - "num_comments", - "first_release_id", - "short_id", - ], - "columntypes": [ - "bigint", - "character varying(64)", - "integer", - "text", - "character varying(200)", - "integer", - "integer", - "timestamp with time zone", - "timestamp with time zone", - "text", - "integer", - "bigint", - "integer", - "integer", - "timestamp with time zone", - "timestamp with time zone", - "boolean", - "character varying(64)", - "integer", - "bigint", - "bigint", - ], - "columnvalues": [ - 74, - "", - 40, - " ZeroDivisionError integer division or modulo by " - "zero client3.py __main__ in ", - "__main__ in ", - 0, - 2, - "2019-06-19 06:46:28+00", - "2019-06-19 06:45:32+00", - "eJyT7tuwzAM3PkV2pzJiO34VRSdmvxAgA5dCtViDAGyJEi0AffrSxrZOlSTjrzj3Z1MrOBekCWHBcQaPj4xhXe72WyDv6YU0ouynnDGpMxzrEJSSzCrC+p7Vz8sgNhAvhdOZ/pKOKHd0PC5C9yqtjuPddcPQ9n0w8hPiLRHsWvZGsWD/91xIya2IFxz7vJWfTUlHHnwSCEBUkbTZrxCCcOf2baY/XTU1VJm9cjHL4JriHPYvOnliyP0Jt2q4SpLkz7v6owW9E9rEOvl0PawczxcvkLIWppxg==", - 1560926969, - 2, - 0, - 0, - None, - "2019-06-19 06:45:32+00", - False, - "python", - 0, - None, - 20, - ], - "event": "change", - "kind": "update", - "oldkeys": {"keynames": ["id"], "keytypes": ["bigint"], "keyvalues": [74]}, - "schema": "public", - "table": "sentry_groupedmessage", - "timestamp": "2019-09-19 00:17:21.44787+00", - "xid": 2380866, - } - ) - - DELETE_MSG = DeleteEvent( - { - "event": "change", - "kind": "delete", - "oldkeys": { - "keynames": ["id", "project_id"], - "keytypes": ["bigint", "bigint"], - "keyvalues": [74, 2], - }, - "schema": "public", - "table": "sentry_groupedmessage", - "timestamp": "2019-09-19 00:17:21.44787+00", - "xid": 2380866, - } - ) - - INSERT_MSG = InsertEvent( - { - "columnnames": [ - "id", - "logger", - "level", - "message", - "view", - "status", - "times_seen", - "last_seen", - "first_seen", - "data", - "score", - "project_id", - "time_spent_total", - "time_spent_count", - "resolved_at", - "active_at", - "is_public", - "platform", - "num_comments", - "first_release_id", - "short_id", - ], - "columntypes": [ - "bigint", - "character varying(64)", - "integer", - "text", - "character varying(200)", - "integer", - "integer", - "timestamp with time zone", - "timestamp with time zone", - "text", - "integer", - "bigint", - "integer", - "integer", - "timestamp with time zone", - "timestamp with time zone", - "boolean", - "character varying(64)", - "integer", - "bigint", - "bigint", - ], - "columnvalues": [ - 74, - "", - 40, - " ZeroDivisionError integer division or modulo by " - "zero client3.py __main__ in ", - "__main__ in ", - 0, - 2, - "2019-06-19 06:46:28+00", - "2019-06-19 06:45:32+00", - "eJyT7tuwzAM3PkV2pzJiO34VRSdmvxAgA5dCtViDAGyJEi0AffrSxrZOlSTjrzj3Z1MrOBekCWHBcQaPj4xhXe72WyDv6YU0ouynnDGpMxzrEJSSzCrC+p7Vz8sgNhAvhdOZ/pKOKHd0PC5C9yqtjuPddcPQ9n0w8hPiLRHsWvZGsWD/91xIya2IFxz7vJWfTUlHHnwSCEBUkbTZrxCCcOf2baY/XTU1VJm9cjHL4JriHPYvOnliyP0Jt2q4SpLkz7v6owW9E9rEOvl0PawczxcvkLIWppxg==", - 1560926969, - 2, - 0, - 0, - None, - "2019-06-19 06:45:32+00", - False, - "python", - 0, - None, - 20, - ], - "event": "change", - "kind": "insert", - "schema": "public", - "table": "sentry_groupedmessage", - "timestamp": "2019-09-19 00:17:21.44787+00", - "xid": 2380866, - } - ) - - PROCESSED = { - "offset": 42, - "project_id": 2, - "id": 74, - "record_deleted": 0, - "status": 0, - "last_seen": datetime(2019, 6, 19, 6, 46, 28, tzinfo=timezone.utc), - "first_seen": datetime(2019, 6, 19, 6, 45, 32, tzinfo=timezone.utc), - "active_at": datetime(2019, 6, 19, 6, 45, 32, tzinfo=timezone.utc), - "first_release_id": None, - } - - DELETED = { - "offset": 42, - "project_id": 2, - "id": 74, - "record_deleted": 1, - "status": None, - "last_seen": None, - "first_seen": None, - "active_at": None, - "first_release_id": None, - } - - def test_messages(self) -> None: - processor = GroupedMessageProcessor() - - metadata = KafkaMessageMetadata( - offset=42, partition=0, timestamp=datetime(1970, 1, 1) - ) - - ret = processor.process_message(self.INSERT_MSG, metadata) - assert ret == InsertBatch( - [self.PROCESSED], - datetime(2019, 9, 19, 0, 17, 21, 447870, tzinfo=timezone.utc), - ) - write_processed_messages(self.storage, [ret]) - results = ( - get_cluster(StorageSetKey.EVENTS) - .get_query_connection(ClickhouseClientSettings.INSERT) - .execute("SELECT * FROM groupedmessage_local;") - .results - ) - assert results[0] == ( - 42, # offset - 0, # deleted - 2, # project_id - 74, # id - 0, # status - datetime(2019, 6, 19, 6, 46, 28), - datetime(2019, 6, 19, 6, 45, 32), - datetime(2019, 6, 19, 6, 45, 32), - None, - ) - - ret = processor.process_message(self.UPDATE_MSG, metadata) - assert ret == InsertBatch( - [self.PROCESSED], - datetime(2019, 9, 19, 0, 17, 21, 447870, tzinfo=timezone.utc), - ) - - ret = processor.process_message(self.DELETE_MSG, metadata) - assert ret == InsertBatch( - [self.DELETED], - datetime(2019, 9, 19, 0, 17, 21, 447870, tzinfo=timezone.utc), - ) - - def test_bulk_load(self) -> None: - row = GroupedMessageRow.from_bulk( - { - "project_id": "2", - "id": "10", - "status": "0", - "last_seen": "2019-06-28 17:57:32+00", - "first_seen": "2019-06-28 06:40:17+00", - "active_at": "2019-06-28 06:40:17+00", - "first_release_id": "26", - } - ) - write_processed_messages( - self.storage, [InsertBatch([row.to_clickhouse()], None)] - ) - ret = ( - get_cluster(StorageSetKey.EVENTS) - .get_query_connection(ClickhouseClientSettings.QUERY) - .execute("SELECT * FROM groupedmessage_local;") - .results - ) - assert ret[0] == ( - 0, # offset - 0, # deleted - 2, # project_id - 10, # id - 0, # status - datetime(2019, 6, 28, 17, 57, 32), - datetime(2019, 6, 28, 6, 40, 17), - datetime(2019, 6, 28, 6, 40, 17), - 26, - ) diff --git a/tests/datasets/cdc/test_message_filters.py b/tests/datasets/cdc/test_message_filters.py deleted file mode 100644 index f7d25263627..00000000000 --- a/tests/datasets/cdc/test_message_filters.py +++ /dev/null @@ -1,47 +0,0 @@ -from datetime import datetime - -from arroyo.backends.kafka import KafkaPayload -from arroyo.types import BrokerValue, Message, Partition, Topic - -from snuba.datasets.message_filters import CdcTableNameMessageFilter - - -def test_table_name_filter() -> None: - table_name = "table_name" - message_filter = CdcTableNameMessageFilter(table_name) - - # Messages that math the table should not be dropped. - assert not message_filter.should_drop( - Message( - BrokerValue( - KafkaPayload(None, b"", [("table", table_name.encode("utf8"))]), - Partition(Topic("topic"), 0), - 0, - datetime.now(), - ) - ) - ) - - # Messages without a table should be dropped. - assert message_filter.should_drop( - Message( - BrokerValue( - KafkaPayload(None, b"", []), - Partition(Topic("topic"), 0), - 0, - datetime.now(), - ) - ) - ) - - # Messages from a different table should be dropped. - assert message_filter.should_drop( - Message( - BrokerValue( - KafkaPayload(None, b"", [("table", b"other_table")]), - Partition(Topic("topic"), 0), - 0, - datetime.now(), - ) - ) - ) From 1bae97333985057ab2d6203c9ab74341a7f9f9bc Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Wed, 22 Jan 2025 14:54:20 -0800 Subject: [PATCH 04/10] remove more stuff --- snuba/datasets/cdc/groupassignee_processor.py | 110 ---------------- tests/snapshots/test_postgres_snapshot.py | 122 ------------------ 2 files changed, 232 deletions(-) delete mode 100644 snuba/datasets/cdc/groupassignee_processor.py delete mode 100644 tests/snapshots/test_postgres_snapshot.py diff --git a/snuba/datasets/cdc/groupassignee_processor.py b/snuba/datasets/cdc/groupassignee_processor.py deleted file mode 100644 index ee5d8410680..00000000000 --- a/snuba/datasets/cdc/groupassignee_processor.py +++ /dev/null @@ -1,110 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from datetime import datetime -from typing import Any, Mapping, Optional, Sequence, Union - -from snuba.datasets.cdc.cdcprocessors import ( - CdcMessageRow, - CdcProcessor, - parse_postgres_datetime, - postgres_date_to_clickhouse, -) -from snuba.writer import WriterTableRow - - -@dataclass(frozen=True) -class GroupAssigneeRecord: - date_added: Union[datetime, str] - user_id: Optional[int] - team_id: Optional[int] - - -@dataclass(frozen=True) -class GroupAssigneeRow(CdcMessageRow): - offset: Optional[int] - record_deleted: bool - project_id: int - group_id: int - record_content: Union[None, GroupAssigneeRecord] - - @classmethod - def from_wal( - cls, - offset: int, - columnnames: Sequence[str], - columnvalues: Sequence[Any], - ) -> GroupAssigneeRow: - raw_data = dict(zip(columnnames, columnvalues)) - return cls( - offset=offset, - record_deleted=False, - project_id=raw_data["project_id"], - group_id=raw_data["group_id"], - record_content=GroupAssigneeRecord( - date_added=parse_postgres_datetime(raw_data["date_added"]), - user_id=int(raw_data["user_id"]) - if raw_data["user_id"] is not None - else None, - team_id=int(raw_data["team_id"]) - if raw_data["team_id"] is not None - else None, - ), - ) - - @classmethod - def from_bulk( - cls, - row: Mapping[str, Any], - ) -> GroupAssigneeRow: - return cls( - offset=0, - record_deleted=False, - project_id=row["project_id"], - group_id=row["group_id"], - record_content=GroupAssigneeRecord( - date_added=postgres_date_to_clickhouse(row["date_added"]), - user_id=int(row["user_id"]) if row["user_id"] != "" else None, - team_id=int(row["team_id"]) if row["team_id"] != "" else None, - ), - ) - - def to_clickhouse(self) -> WriterTableRow: - record = self.record_content - return { - "offset": self.offset if self.offset is not None else 0, - "project_id": self.project_id, - "group_id": self.group_id, - "record_deleted": 1 if self.record_deleted else 0, - "date_added": None if not record else record.date_added, - "user_id": None if not record else record.user_id, - "team_id": None if not record else record.team_id, - } - - -class GroupAssigneeProcessor(CdcProcessor): - def __init__(self) -> None: - postgres_table = "sentry_groupasignee" - super().__init__( - pg_table=postgres_table, - message_row_class=GroupAssigneeRow, - ) - - def _process_delete( - self, - offset: int, - key: Mapping[str, Any], - ) -> Sequence[WriterTableRow]: - key_names = key["keynames"] - key_values = key["keyvalues"] - project_id = key_values[key_names.index("project_id")] - group_id = key_values[key_names.index("group_id")] - return [ - GroupAssigneeRow( - offset=offset, - record_deleted=True, - project_id=project_id, - group_id=group_id, - record_content=None, - ).to_clickhouse() - ] diff --git a/tests/snapshots/test_postgres_snapshot.py b/tests/snapshots/test_postgres_snapshot.py deleted file mode 100644 index 627aa2b5f82..00000000000 --- a/tests/snapshots/test_postgres_snapshot.py +++ /dev/null @@ -1,122 +0,0 @@ -from pathlib import Path - -import pytest - -from snuba.snapshots import ( # NOQA - ColumnConfig, - DateFormatPrecision, - DateTimeFormatterConfig, -) -from snuba.snapshots.postgres_snapshot import PostgresSnapshot - -META_FILE = """ -{ - "snapshot_id": "50a86ad6-b4b7-11e9-a46f-acde48001122", - "product": "snuba", - "transactions": { - "xmin": 3372750, - "xmax": 3372754, - "xip_list": [] - }, - "content": [ - { - "table": "sentry_groupedmessage", - "zip": false, - "columns": [ - {"name": "id"}, - {"name": "status"} - ] - }, - { - "table": "sentry_groupasignee", - "zip": true, - "columns": [ - {"name": "id"}, - { - "name": "a_date", - "formatter": { - "type": "datetime", - "precision": "second" - } - } - ] - } - ], - "start_timestamp": 1564703503.682226 -} -""" - - -class TestPostgresSnapshot: - def __prepare_directory(self, tmp_path: Path, table_content: str) -> str: - snapshot_base = tmp_path / "cdc-snapshot" - snapshot_base.mkdir() - meta = snapshot_base / "metadata.json" - meta.write_text(META_FILE) - tables_dir = tmp_path / "cdc-snapshot" / "tables" - tables_dir.mkdir() - groupedmessage = tables_dir / "sentry_groupedmessage.csv" - groupedmessage.write_text(table_content) - groupassignee = tables_dir / "sentry_groupasignee" - groupassignee.write_text( - """id,project_id -""" - ) - return str(snapshot_base) - - def test_parse_snapshot(self, tmp_path: Path) -> None: - snapshot_base = self.__prepare_directory( - tmp_path, - """id,status -0,1 -""", - ) - snapshot = PostgresSnapshot.load("snuba", snapshot_base) - descriptor = snapshot.get_descriptor() - assert descriptor.id == "50a86ad6-b4b7-11e9-a46f-acde48001122" - assert descriptor.xmax == 3372754 - assert descriptor.xmin == 3372750 - assert descriptor.xip_list == [] - tables = { - table_config.table: (table_config.columns, table_config.zip) - for table_config in descriptor.tables - } - assert "sentry_groupedmessage" in tables - assert tables["sentry_groupedmessage"] == ( - [ColumnConfig("id"), ColumnConfig("status")], - False, - ) - assert "sentry_groupasignee" in tables - assert tables["sentry_groupasignee"] == ( - [ - ColumnConfig("id"), - ColumnConfig( - "a_date", - formatter=DateTimeFormatterConfig( - precision=DateFormatPrecision.SECOND - ), - ), - ], - True, - ) - - with snapshot.get_parsed_table_file("sentry_groupedmessage") as table: - assert next(table) == { - "id": "0", - "status": "1", - } - - with snapshot.get_preprocessed_table_file("sentry_groupedmessage") as table: - assert next(table) == b"id,status\n0,1\n" - - def test_parse_invalid_snapshot(self, tmp_path: Path) -> None: - snapshot_base = self.__prepare_directory( - tmp_path, - """id -0 -""", - ) - with pytest.raises(ValueError, match=".+sentry_groupedmessage.+status.+"): - snapshot = PostgresSnapshot.load("snuba", snapshot_base) - with snapshot.get_parsed_table_file("sentry_groupedmessage") as table: - next(table) From 85648d007c62b8bafb5ac425a340f696a12f5f65 Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Wed, 22 Jan 2025 15:33:14 -0800 Subject: [PATCH 05/10] delete more tests --- .../datasets/storages/test_storage_factory.py | 2 - tests/migrations/test_runner_individual.py | 55 +------------------ 2 files changed, 1 insertion(+), 56 deletions(-) diff --git a/tests/datasets/storages/test_storage_factory.py b/tests/datasets/storages/test_storage_factory.py index 2de2a91d87b..d538e58677c 100644 --- a/tests/datasets/storages/test_storage_factory.py +++ b/tests/datasets/storages/test_storage_factory.py @@ -13,8 +13,6 @@ StorageKey.DISCOVER, StorageKey.ERRORS, StorageKey.ERRORS_RO, - StorageKey.GROUPEDMESSAGES, - StorageKey.GROUPASSIGNEES, StorageKey.METRICS_COUNTERS, StorageKey.ORG_METRICS_COUNTERS, StorageKey.METRICS_DISTRIBUTIONS, diff --git a/tests/migrations/test_runner_individual.py b/tests/migrations/test_runner_individual.py index 18e673f70b4..4fa8138a082 100644 --- a/tests/migrations/test_runner_individual.py +++ b/tests/migrations/test_runner_individual.py @@ -11,7 +11,7 @@ from snuba.consumers.types import KafkaMessageMetadata from snuba.datasets.storages.factory import get_writable_storage from snuba.datasets.storages.storage_key import StorageKey -from snuba.migrations.groups import MigrationGroup, get_group_loader +from snuba.migrations.groups import MigrationGroup from snuba.migrations.runner import MigrationKey, Runner from snuba.migrations.status import Status from snuba.processor import InsertBatch @@ -146,59 +146,6 @@ def generate_transactions() -> None: ).write(rows) -@pytest.mark.clickhouse_db -def test_groupedmessages_compatibility() -> None: - cluster = get_cluster(StorageSetKey.EVENTS) - - # Ignore the multi node mode because this tests a migration - # for an older table state that only applied to single node - if not cluster.is_single_node(): - return - - database = cluster.get_database() - connection = cluster.get_query_connection(ClickhouseClientSettings.MIGRATE) - - # Create old style table witihout project ID - connection.execute( - """ - CREATE TABLE groupedmessage_local (`offset` UInt64, `record_deleted` UInt8, - `id` UInt64, `status` Nullable(UInt8), `last_seen` Nullable(DateTime), - `first_seen` Nullable(DateTime), `active_at` Nullable(DateTime), - `first_release_id` Nullable(UInt64)) ENGINE = ReplacingMergeTree(offset) - ORDER BY id SAMPLE BY id SETTINGS index_granularity = 8192 - """ - ) - - migration_id = "0010_groupedmessages_onpremise_compatibility" - - runner = Runner() - runner.run_migration( - MigrationKey(MigrationGroup.SYSTEM, "0001_migrations"), force=True - ) - events_migrations = get_group_loader(MigrationGroup.EVENTS).get_migrations() - - # Mark prior migrations complete - for migration in events_migrations[: (events_migrations.index(migration_id))]: - runner._update_migration_status( - MigrationKey(MigrationGroup.EVENTS, migration), Status.COMPLETED - ) - - runner.run_migration( - MigrationKey(MigrationGroup.EVENTS, migration_id), - force=True, - ) - - outcome = perform_select_query( - ["primary_key"], - "system.tables", - {"name": "groupedmessage_local", "database": str(database)}, - None, - connection, - ) - - assert outcome == [("project_id, id",)] - - @pytest.mark.clickhouse_db def run_prior_migrations( migration_group: MigrationGroup, stop_migration_id: str, runner: Runner From 6891c12892ce0c1939a0a70d528dc735d90bf9b5 Mon Sep 17 00:00:00 2001 From: Volo Kluev Date: Wed, 22 Jan 2025 15:46:29 -0800 Subject: [PATCH 06/10] remove some tests and migrations --- snuba/datasets/cdc/row_processors.py | 12 ----- .../events/0007_groupedmessages.py | 46 +++---------------- .../events/0008_groupassignees.py | 45 +++--------------- ...groupedmessages_onpremise_compatibility.py | 13 +----- 4 files changed, 14 insertions(+), 102 deletions(-) diff --git a/snuba/datasets/cdc/row_processors.py b/snuba/datasets/cdc/row_processors.py index 9e7f4d0ddec..2f312eb859f 100644 --- a/snuba/datasets/cdc/row_processors.py +++ b/snuba/datasets/cdc/row_processors.py @@ -1,8 +1,6 @@ from abc import ABC, abstractmethod from typing import Type, cast -from snuba.datasets.cdc.groupassignee_processor import GroupAssigneeRow -from snuba.datasets.cdc.groupedmessage_processor import GroupedMessageRow from snuba.snapshots import SnapshotTableRow from snuba.utils.registered_class import RegisteredClass from snuba.writer import WriterTableRow @@ -24,13 +22,3 @@ def get_from_name(cls, name: str) -> Type["CdcRowProcessor"]: @classmethod def config_key(cls) -> str: return cls.__name__ - - -class GroupAssigneeRowProcessor(CdcRowProcessor): - def process(self, row: SnapshotTableRow) -> WriterTableRow: - return GroupAssigneeRow.from_bulk(row).to_clickhouse() - - -class GroupedMessageRowProcessor(CdcRowProcessor): - def process(self, row: SnapshotTableRow) -> WriterTableRow: - return GroupedMessageRow.from_bulk(row).to_clickhouse() diff --git a/snuba/snuba_migrations/events/0007_groupedmessages.py b/snuba/snuba_migrations/events/0007_groupedmessages.py index 2c0c9e3b0f0..c55a9f6faf8 100644 --- a/snuba/snuba_migrations/events/0007_groupedmessages.py +++ b/snuba/snuba_migrations/events/0007_groupedmessages.py @@ -1,8 +1,7 @@ from typing import Sequence from snuba.clickhouse.columns import Column, DateTime, UInt -from snuba.clusters.storage_sets import StorageSetKey -from snuba.migrations import migration, operations, table_engines +from snuba.migrations import migration, operations from snuba.migrations.columns import MigrationModifiers as Modifiers columns: Sequence[Column[Modifiers]] = [ @@ -23,51 +22,18 @@ Column("first_release_id", UInt(64, Modifiers(nullable=True))), ] - +# NOTE: CDC storage deprecated class Migration(migration.ClickhouseNodeMigrationLegacy): blocking = False def forwards_local(self) -> Sequence[operations.SqlOperation]: - return [ - operations.CreateTable( - storage_set=StorageSetKey.CDC, - table_name="groupedmessage_local", - columns=columns, - engine=table_engines.ReplacingMergeTree( - storage_set=StorageSetKey.CDC, - version_column="offset", - order_by="(project_id, id)", - sample_by="id", - unsharded=True, - ), - ) - ] + return [] def backwards_local(self) -> Sequence[operations.SqlOperation]: - return [ - operations.DropTable( - storage_set=StorageSetKey.CDC, - table_name="groupedmessage_local", - ) - ] + return [] def forwards_dist(self) -> Sequence[operations.SqlOperation]: - return [ - operations.CreateTable( - storage_set=StorageSetKey.CDC, - table_name="groupedmessage_dist", - columns=columns, - engine=table_engines.Distributed( - local_table_name="groupedmessage_local", - sharding_key=None, - ), - ) - ] + return [] def backwards_dist(self) -> Sequence[operations.SqlOperation]: - return [ - operations.DropTable( - storage_set=StorageSetKey.CDC, - table_name="groupedmessage_dist", - ) - ] + return [] diff --git a/snuba/snuba_migrations/events/0008_groupassignees.py b/snuba/snuba_migrations/events/0008_groupassignees.py index 7440db83df5..9e94a2149fc 100644 --- a/snuba/snuba_migrations/events/0008_groupassignees.py +++ b/snuba/snuba_migrations/events/0008_groupassignees.py @@ -1,8 +1,7 @@ from typing import Sequence from snuba.clickhouse.columns import Column, DateTime, UInt -from snuba.clusters.storage_sets import StorageSetKey -from snuba.migrations import migration, operations, table_engines +from snuba.migrations import migration, operations from snuba.migrations.columns import MigrationModifiers as Modifiers columns: Sequence[Column[Modifiers]] = [ @@ -17,50 +16,18 @@ Column("team_id", UInt(64, Modifiers(nullable=True))), ] - +# NOTE: CDC storage deprecated class Migration(migration.ClickhouseNodeMigrationLegacy): blocking = False def forwards_local(self) -> Sequence[operations.SqlOperation]: - return [ - operations.CreateTable( - storage_set=StorageSetKey.CDC, - table_name="groupassignee_local", - columns=columns, - engine=table_engines.ReplacingMergeTree( - storage_set=StorageSetKey.CDC, - version_column="offset", - order_by="(project_id, group_id)", - unsharded=True, - ), - ) - ] + return [] def backwards_local(self) -> Sequence[operations.SqlOperation]: - return [ - operations.DropTable( - storage_set=StorageSetKey.CDC, - table_name="groupassignee_local", - ) - ] + return [] def forwards_dist(self) -> Sequence[operations.SqlOperation]: - return [ - operations.CreateTable( - storage_set=StorageSetKey.CDC, - table_name="groupassignee_dist", - columns=columns, - engine=table_engines.Distributed( - local_table_name="groupassignee_local", - sharding_key=None, - ), - ) - ] + return [] def backwards_dist(self) -> Sequence[operations.SqlOperation]: - return [ - operations.DropTable( - storage_set=StorageSetKey.CDC, - table_name="groupassignee_dist", - ) - ] + return [] diff --git a/snuba/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py b/snuba/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py index 407df2316b4..9312d897d63 100644 --- a/snuba/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py +++ b/snuba/snuba_migrations/events/0010_groupedmessages_onpremise_compatibility.py @@ -102,16 +102,7 @@ class Migration(migration.CodeMigration): blocking = True def forwards_global(self) -> Sequence[operations.RunPython]: - return [ - operations.RunPython( - func=fix_order_by, description="Sync project ID colum for onpremise" - ), - ] + return [] def backwards_global(self) -> Sequence[operations.RunPython]: - return [ - operations.RunPython( - func=ensure_drop_temporary_tables, - description="Ensure temporary tables created by the migration are dropped", - ) - ] + return [] From 58b4af4b430e08ef65675a62f18db5d3307f315a Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 22 Jan 2026 14:26:55 -0800 Subject: [PATCH 07/10] ref: Remove groupedmessage entity and CDC-related join relationships Remove the `grouped` join relationship from events and discover_events entities as part of removing the groupedmessage CDC entity. Update test files to use profiles entity for join test fixtures instead. Co-Authored-By: Claude --- .../discover/entities/discover_events.yaml | 6 - .../configuration/events/entities/events.yaml | 6 - snuba/query/joins/equivalence_adder.py | 4 +- tests/clickhouse/test_query_format.py | 24 +-- tests/clickhouse/test_query_profiler.py | 8 +- tests/datasets/test_cdc_events.py | 180 ------------------ tests/datasets/test_dataset_factory.py | 2 - tests/datasets/test_entity_factory.py | 1 - tests/datasets/test_fast_bulk_load.py | 61 ------ .../test_entity_processing_stage_composite.py | 4 +- ...test_storage_processing_stage_composite.py | 4 +- tests/query/data_source/test_join.py | 8 +- tests/query/formatters/test_query.py | 23 +-- tests/query/joins/equivalence_schema.py | 4 +- tests/query/joins/join_structures.py | 4 +- tests/query/joins/test_equivalence_adder.py | 30 +-- tests/query/joins/test_equivalences.py | 70 +++---- tests/query/joins/test_subqueries.py | 72 ++----- tests/query/snql/test_invalid_queries.py | 2 +- tests/query/snql/test_query.py | 10 +- tests/query/test_nested.py | 2 +- tests/test_snql_api.py | 20 +- tests/test_snql_sdk_api.py | 4 +- tests/test_writer.py | 21 +- 24 files changed, 119 insertions(+), 451 deletions(-) delete mode 100644 tests/datasets/test_cdc_events.py delete mode 100644 tests/datasets/test_fast_bulk_load.py diff --git a/snuba/datasets/configuration/discover/entities/discover_events.yaml b/snuba/datasets/configuration/discover/entities/discover_events.yaml index 8565476445e..9fc05f4cf4e 100644 --- a/snuba/datasets/configuration/discover/entities/discover_events.yaml +++ b/snuba/datasets/configuration/discover/entities/discover_events.yaml @@ -545,12 +545,6 @@ subscription_validators: required_time_column: timestamp join_relationships: - grouped: - rhs_entity: groupedmessage - join_type: inner - columns: - - [project_id, project_id] - - [group_id, id] assigned: rhs_entity: groupassignee join_type: inner diff --git a/snuba/datasets/configuration/events/entities/events.yaml b/snuba/datasets/configuration/events/entities/events.yaml index 22483d5c6d7..4a9bab9726c 100644 --- a/snuba/datasets/configuration/events/entities/events.yaml +++ b/snuba/datasets/configuration/events/entities/events.yaml @@ -541,12 +541,6 @@ subscription_validators: allows_group_by_without_condition: true join_relationships: - grouped: - rhs_entity: groupedmessage - join_type: inner - columns: - - [project_id, project_id] - - [group_id, id] assigned: rhs_entity: groupassignee join_type: inner diff --git a/snuba/query/joins/equivalence_adder.py b/snuba/query/joins/equivalence_adder.py index 87078b2882e..b0fa317dfcf 100644 --- a/snuba/query/joins/equivalence_adder.py +++ b/snuba/query/joins/equivalence_adder.py @@ -21,9 +21,9 @@ def add_equivalent_conditions(query: CompositeQuery[Entity]) -> None: equivalent in another entity in the join and add the same condition on the equivalent column. - Example: In a join between events and groupedmessage, if there is + Example: In a join between events and profiles, if there is a condition on events.project_id, it would replicate the same - condition on groupedmessage.project_id as this is a semantically + condition on profiles.project_id as this is a semantically equivalent column. The goal is to reduce the amount of data that is loaded by clickhouse diff --git a/tests/clickhouse/test_query_format.py b/tests/clickhouse/test_query_format.py index 53c9262e602..17c826d4cf4 100644 --- a/tests/clickhouse/test_query_format.py +++ b/tests/clickhouse/test_query_format.py @@ -56,7 +56,7 @@ ) node_group = IndividualNode( alias="groups", - data_source=Table("groupedmessage_local", GROUPS_SCHEMA, storage_key=StorageKey("groups")), + data_source=Table("profiles_local", GROUPS_SCHEMA, storage_key=StorageKey("groups")), ) node_assignee = IndividualNode( alias="assignee", @@ -381,7 +381,7 @@ [ ["errors_local", "err"], "INNER JOIN", - ["groupedmessage_local", "groups"], + ["profiles_local", "groups"], "ON", ["err.group_id=groups.id"], ], @@ -390,13 +390,13 @@ ], ( "SELECT (err.event_id AS error_id), (groups.message AS message) " - "FROM errors_local err INNER JOIN groupedmessage_local groups " + "FROM errors_local err INNER JOIN profiles_local groups " "ON err.group_id=groups.id " "WHERE eq(groups.id, 1)" ), ( "SELECT (err.event_id AS error_id), (groups.message AS message) " - "FROM errors_local err INNER JOIN groupedmessage_local groups " + "FROM errors_local err INNER JOIN profiles_local groups " "ON err.group_id=groups.id " "WHERE eq(groups.id, -1337)" ), @@ -433,7 +433,7 @@ alias="groups", data_source=Query( from_clause=Table( - "groupedmessage_local", + "profiles_local", GROUPS_SCHEMA, storage_key=StorageKey("dontmatter"), ), @@ -506,7 +506,7 @@ [ [ "SELECT id, message", - ["FROM", "groupedmessage_local"], + ["FROM", "profiles_local"], "WHERE eq(project_id, 1)", ], "groups", @@ -534,7 +534,7 @@ "FROM " "(SELECT (event_id AS error_id), group_id FROM errors_local WHERE eq(project_id, 1)) err " "INNER JOIN " - "(SELECT id, message FROM groupedmessage_local WHERE eq(project_id, 1)) groups " + "(SELECT id, message FROM profiles_local WHERE eq(project_id, 1)) groups " "ON err.group_id=groups.id " "INNER JOIN " "(SELECT group_id FROM groupassignee_local WHERE eq(user, 'me')) assignee " @@ -546,7 +546,7 @@ "FROM " "(SELECT (event_id AS error_id), group_id FROM errors_local WHERE eq(project_id, -1337)) err " "INNER JOIN " - "(SELECT id, message FROM groupedmessage_local WHERE eq(project_id, -1337)) groups " + "(SELECT id, message FROM profiles_local WHERE eq(project_id, -1337)) groups " "ON err.group_id=groups.id " "INNER JOIN " "(SELECT group_id FROM groupassignee_local WHERE eq(user, '$S')) assignee " @@ -688,7 +688,7 @@ def test_delete_query() -> None: [ PaddingNode(None, StringNode("errors_local"), "err"), StringNode("SEMI INNER JOIN"), - PaddingNode(None, StringNode("groupedmessage_local"), "groups"), + PaddingNode(None, StringNode("profiles_local"), "groups"), StringNode("ON"), SequenceNode( [ @@ -700,7 +700,7 @@ def test_delete_query() -> None: ] ), ( - "errors_local err SEMI INNER JOIN groupedmessage_local groups " + "errors_local err SEMI INNER JOIN profiles_local groups " "ON err.group_id=groups.id AND err.project_id=groups.project_id" ), id="Simple join", @@ -734,7 +734,7 @@ def test_delete_query() -> None: [ PaddingNode(None, StringNode("errors_local"), "err"), StringNode("SEMI INNER JOIN"), - PaddingNode(None, StringNode("groupedmessage_local"), "groups"), + PaddingNode(None, StringNode("profiles_local"), "groups"), StringNode("ON"), SequenceNode([StringNode("err.group_id=groups.id")], " AND "), ] @@ -746,7 +746,7 @@ def test_delete_query() -> None: ] ), ( - "errors_local err SEMI INNER JOIN groupedmessage_local groups " + "errors_local err SEMI INNER JOIN profiles_local groups " "ON err.group_id=groups.id INNER JOIN groupassignee_local assignee " "ON err.group_id=assignee.id" ), diff --git a/tests/clickhouse/test_query_profiler.py b/tests/clickhouse/test_query_profiler.py index 41bd971dfc3..d8f307548b3 100644 --- a/tests/clickhouse/test_query_profiler.py +++ b/tests/clickhouse/test_query_profiler.py @@ -165,11 +165,11 @@ ), ClickhouseQueryProfile( time_range=31, - table="groupedmessage_local,sentry_errors", + table="profiles_local,sentry_errors", all_columns={ "sentry_errors.group_id", "sentry_errors.timestamp", - "groupedmessage_local.id", + "profiles_local.id", "sentry_errors.tags.key", "sentry_errors.tags.value", }, @@ -218,11 +218,11 @@ ), ClickhouseQueryProfile( time_range=None, - table="groupedmessage_local,sentry_errors", + table="profiles_local,sentry_errors", all_columns={ "sentry_errors.timestamp", "sentry_errors.group_id", - "groupedmessage_local.id", + "profiles_local.id", }, multi_level_condition=True, where_profile=FilterProfile( diff --git a/tests/datasets/test_cdc_events.py b/tests/datasets/test_cdc_events.py deleted file mode 100644 index 12b8a76c8df..00000000000 --- a/tests/datasets/test_cdc_events.py +++ /dev/null @@ -1,180 +0,0 @@ -from datetime import datetime, timedelta, timezone -from functools import partial - -import pytest -import simplejson as json - -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity -from snuba.utils.metrics.backends.dummy import DummyMetricsBackend -from tests.base import BaseApiTest -from tests.fixtures import get_raw_event -from tests.helpers import write_unprocessed_events - -TEST_GROUP_JOIN_PARAMS = [ - pytest.param( - "(e: events) -[grouped]-> (g: groupedmessage)", - "=", - 1, - id="events groups join on existing group", - ), - pytest.param( - "(e: events) -[grouped]-> (g: groupedmessage)", - "!=", - 0, - id="events groups join on non existing group", - ), - pytest.param( - "(g: groupedmessage) -[groups]-> (e: events)", - "=", - 1, - id="groups events join on existing group", - ), - pytest.param( - "(g: groupedmessage) -[groups]-> (e: events)", - "!=", - 0, - id="groups events join on non existing group", - ), -] - -TEST_ASSIGNEE_JOIN_PARAMS = [ - pytest.param( - "(e: events) -[assigned]-> (a: groupassignee)", - "=", - 1, - id="events assignees join on existing user", - ), - pytest.param( - "(a: groupassignee) -[owns]-> (e: events)", - "=", - 1, - id="assignees events join on existing user", - ), -] - - -class TestCdcEvents(BaseApiTest): - @pytest.fixture(autouse=True) - def setup_fixture(self, clickhouse_db, redis_db): - self.app.post = partial(self.app.post, headers={"referer": "test"}) - self.event = get_raw_event() - self.project_id = self.event["project_id"] - self.base_time = datetime.utcnow().replace( - second=0, microsecond=0, tzinfo=timezone.utc - ) - timedelta(minutes=90) - self.next_time = self.base_time + timedelta(minutes=95) - - self.events_storage = get_entity(EntityKey.EVENTS).get_writable_storage() - write_unprocessed_events(self.events_storage, [self.event]) - - groups = [ - { - "offset": 0, - "project_id": self.project_id, - "id": self.event["group_id"], - "record_deleted": 0, - "status": 0, - } - ] - - groups_storage = get_entity(EntityKey.GROUPEDMESSAGE).get_writable_storage() - groups_storage.get_table_writer().get_batch_writer( - metrics=DummyMetricsBackend(strict=True) - ).write([json.dumps(group).encode("utf-8") for group in groups]) - - assignees = [ - { - "offset": 0, - "project_id": self.project_id, - "group_id": self.event["group_id"], - "record_deleted": 0, - "user_id": 100, - } - ] - - assignees_storage = get_entity(EntityKey.GROUPASSIGNEE).get_writable_storage() - assignees_storage.get_table_writer().get_batch_writer( - metrics=DummyMetricsBackend(strict=True) - ).write([json.dumps(assignee).encode("utf-8") for assignee in assignees]) - - @pytest.mark.clickhouse_db - @pytest.mark.redis_db - @pytest.mark.parametrize( - "relationship, operator, expected_rows", TEST_GROUP_JOIN_PARAMS - ) - def test_groups_join( - self, relationship: str, operator: str, expected_rows: int - ) -> None: - query_template = ( - "MATCH %(relationship)s " - "SELECT e.event_id WHERE " - "e.project_id = %(project_id)s AND " - "g.project_id = %(project_id)s AND " - "g.id %(operator)s %(group_id)s AND " - "e.timestamp >= toDateTime('%(btime)s') AND " - "e.timestamp < toDateTime('%(ntime)s')" - ) - - response = self.app.post( - "/events/snql", - data=json.dumps( - { - "dataset": "events", - "query": query_template - % { - "relationship": relationship, - "project_id": self.project_id, - "operator": operator, - "group_id": self.event["group_id"], - "btime": self.base_time, - "ntime": self.next_time, - }, - "tenant_ids": {"organization_id": 1, "referrer": "abcd"}, - } - ), - ) - data = json.loads(response.data) - - assert response.status_code == 200, data - assert len(data["data"]) == expected_rows, data - - @pytest.mark.clickhouse_db - @pytest.mark.redis_db - @pytest.mark.parametrize( - "relationship, operator, expected_rows", TEST_ASSIGNEE_JOIN_PARAMS - ) - def test_assignee_join( - self, relationship: str, operator: str, expected_rows: int - ) -> None: - query_template = ( - "MATCH %(relationship)s " - "SELECT e.event_id WHERE " - "e.project_id = %(project_id)s AND " - "a.project_id = %(project_id)s AND " - "a.user_id %(operator)s 100 AND " - "e.timestamp >= toDateTime('%(btime)s') AND " - "e.timestamp < toDateTime('%(ntime)s')" - ) - - response = self.app.post( - "/events/snql", - data=json.dumps( - { - "dataset": "events", - "query": query_template - % { - "relationship": relationship, - "project_id": self.project_id, - "operator": operator, - "group_id": self.event["group_id"], - "btime": self.base_time, - "ntime": self.next_time, - }, - "tenant_ids": {"organization_id": 1, "referrer": "abcd"}, - } - ), - ) - data = json.loads(response.data) - assert response.status_code == 200, data - assert len(data["data"]) == expected_rows, data diff --git a/tests/datasets/test_dataset_factory.py b/tests/datasets/test_dataset_factory.py index 42a4bd0b41c..3d7988a5642 100644 --- a/tests/datasets/test_dataset_factory.py +++ b/tests/datasets/test_dataset_factory.py @@ -20,7 +20,6 @@ def test_get_dataset() -> None: "events", "events_analytics_platform", "groupassignee", - "groupedmessage", "metrics", "outcomes", "outcomes_raw", @@ -65,7 +64,6 @@ def test_all_names() -> None: "events", "events_analytics_platform", "groupassignee", - "groupedmessage", "metrics", "outcomes", "outcomes_raw", diff --git a/tests/datasets/test_entity_factory.py b/tests/datasets/test_entity_factory.py index f5113858d13..c08d8bcc033 100644 --- a/tests/datasets/test_entity_factory.py +++ b/tests/datasets/test_entity_factory.py @@ -17,7 +17,6 @@ EntityKey.SPANS_NUM_ATTRS, EntityKey.SPANS_STR_ATTRS, EntityKey.GROUPASSIGNEE, - EntityKey.GROUPEDMESSAGE, EntityKey.OUTCOMES, EntityKey.OUTCOMES_RAW, EntityKey.SEARCH_ISSUES, diff --git a/tests/datasets/test_fast_bulk_load.py b/tests/datasets/test_fast_bulk_load.py deleted file mode 100644 index a65b01127fc..00000000000 --- a/tests/datasets/test_fast_bulk_load.py +++ /dev/null @@ -1,61 +0,0 @@ -import pytest - -from snuba.clickhouse import DATETIME_FORMAT -from snuba.datasets.cdc.groupedmessage_processor import GroupedMessageRow - - -class TestFastGroupedMessageLoad: - def test_supported_date_format(self) -> None: - """ - This test is to ensure the compatibility between - the clickhouse datetime format and the hardcoded - one in the bulk load process (for performance). - If someone changes the format without updating the - bulk loader this test will fail. - """ - assert DATETIME_FORMAT == "%Y-%m-%d %H:%M:%S" - - def test_basic_date(self) -> None: - message = GroupedMessageRow.from_bulk( - { - "project_id": "2", - "id": "1", - "status": "0", - # UTC date with nanoseconds - "last_seen": "2019-07-01 18:03:07.984123+00", - # UTC date without nanosecods - "first_seen": "2019-07-01 18:03:07+00", - # None date - "active_at": "", - "first_release_id": "0", - } - ) - - assert message.to_clickhouse() == { - "offset": 0, - "project_id": 2, - "id": 1, - "record_deleted": 0, - "status": 0, - "last_seen": "2019-07-01 18:03:07", - "first_seen": "2019-07-01 18:03:07", - "active_at": None, - "first_release_id": 0, - } - - def test_failure(self) -> None: - with pytest.raises(AssertionError): - GroupedMessageRow.from_bulk( - { - "project_id": "2", - "id": "1", - "status": "0", - # Non UTC date with nanoseconds - "last_seen": "2019-07-01 18:03:07.984+05", - # UTC date without nanosecods - "first_seen": "2019-07-01 18:03:07+00", - # another UTC date with less precision - "active_at": "2019-06-25 22:15:57.6+00", - "first_release_id": "0", - } - ) diff --git a/tests/pipeline/test_entity_processing_stage_composite.py b/tests/pipeline/test_entity_processing_stage_composite.py index 7052dceaca7..246d140c8d4 100644 --- a/tests/pipeline/test_entity_processing_stage_composite.py +++ b/tests/pipeline/test_entity_processing_stage_composite.py @@ -57,8 +57,8 @@ mandatory_conditions=events_storage.get_schema().get_data_source().get_mandatory_conditions(), ) -groups_ent = Entity(EntityKey.GROUPEDMESSAGE, get_entity(EntityKey.GROUPEDMESSAGE).get_data_model()) -groups_storage = get_storage(StorageKey.GROUPEDMESSAGES) +groups_ent = Entity(EntityKey.PROFILES, get_entity(EntityKey.PROFILES).get_data_model()) +groups_storage = get_storage(StorageKey.PROFILES) groups_schema = groups_storage.get_schema() assert isinstance(groups_schema, TableSchema) diff --git a/tests/pipeline/test_storage_processing_stage_composite.py b/tests/pipeline/test_storage_processing_stage_composite.py index 211a0b47e06..e02d6c6bf1b 100644 --- a/tests/pipeline/test_storage_processing_stage_composite.py +++ b/tests/pipeline/test_storage_processing_stage_composite.py @@ -46,8 +46,8 @@ mandatory_conditions=events_storage.get_schema().get_data_source().get_mandatory_conditions(), ) -groups_ent = Entity(EntityKey.GROUPEDMESSAGE, get_entity(EntityKey.GROUPEDMESSAGE).get_data_model()) -groups_storage = get_storage(StorageKey.GROUPEDMESSAGES) +groups_ent = Entity(EntityKey.PROFILES, get_entity(EntityKey.PROFILES).get_data_model()) +groups_storage = get_storage(StorageKey.PROFILES) groups_schema = groups_storage.get_schema() assert isinstance(groups_schema, TableSchema) diff --git a/tests/query/data_source/test_join.py b/tests/query/data_source/test_join.py index d59b454b1e2..f751539c778 100644 --- a/tests/query/data_source/test_join.py +++ b/tests/query/data_source/test_join.py @@ -13,9 +13,7 @@ from snuba.query.expressions import Column from snuba.query.logical import Query -ERRORS_SCHEMA = ColumnSet( - [("event_id", UUID()), ("message", String()), ("group_id", UInt(32))] -) +ERRORS_SCHEMA = ColumnSet([("event_id", UUID()), ("message", String()), ("group_id", UInt(32))]) GROUPS_SCHEMA = ColumnSet([("id", UInt(32)), ("message", String())]) @@ -33,7 +31,7 @@ def test_simple_join() -> None: e = Entity(key=EntityKey.EVENTS, schema=ERRORS_SCHEMA) node_err = IndividualNode(alias="err", data_source=e) - g = Entity(key=EntityKey.GROUPEDMESSAGE, schema=GROUPS_SCHEMA) + g = Entity(key=EntityKey.PROFILES, schema=GROUPS_SCHEMA) node_group = IndividualNode(alias="groups", data_source=g) join = JoinClause( @@ -62,7 +60,7 @@ def test_complex_joins() -> None: e = Entity(key=EntityKey.EVENTS, schema=ERRORS_SCHEMA) node_err = IndividualNode(alias="err", data_source=e) - g = Entity(key=EntityKey.GROUPEDMESSAGE, schema=GROUPS_SCHEMA) + g = Entity(key=EntityKey.PROFILES, schema=GROUPS_SCHEMA) node_group = IndividualNode(alias="groups", data_source=g) a = Entity(key=EntityKey.GROUPASSIGNEE, schema=GROUPS_ASSIGNEE) diff --git a/tests/query/formatters/test_query.py b/tests/query/formatters/test_query.py index e70c8b9f236..d074c16c7f4 100644 --- a/tests/query/formatters/test_query.py +++ b/tests/query/formatters/test_query.py @@ -2,9 +2,8 @@ import pytest -from snuba.clickhouse.columns import ColumnSet +from snuba.clickhouse.columns import ColumnSet, UInt from snuba.clickhouse.columns import SchemaModifiers as Modifiers -from snuba.clickhouse.columns import UInt from snuba.clickhouse.query import Query as ClickhouseQuery from snuba.datasets.entities.entity_key import EntityKey from snuba.datasets.storages.storage_key import StorageKey @@ -39,7 +38,7 @@ ), right_node=IndividualNode( alias="gr", - data_source=Entity(EntityKey.GROUPEDMESSAGE, GROUPS_SCHEMA, None), + data_source=Entity(EntityKey.PROFILES, GROUPS_SCHEMA, None), ), keys=[ JoinCondition( @@ -61,14 +60,10 @@ from_clause=Entity(EntityKey.EVENTS, EVENTS_SCHEMA, 0.5), selected_columns=[ SelectedExpression("c1", Column("_snuba_c1", "t", "c")), - SelectedExpression( - "f1", FunctionCall("_snuba_f1", "f", (Column(None, "t", "c2"),)) - ), + SelectedExpression("f1", FunctionCall("_snuba_f1", "f", (Column(None, "t", "c2"),))), ], array_join=Column(None, None, "col"), - condition=binary_condition( - "equals", Column(None, None, "c4"), Literal(None, "asd") - ), + condition=binary_condition("equals", Column(None, None, "c4"), Literal(None, "asd")), groupby=[Column(None, "t", "c4")], having=binary_condition("equals", Column(None, None, "c6"), Literal(None, "asd2")), order_by=[OrderBy(OrderByDirection.ASC, Column(None, "t", "c"))], @@ -116,9 +111,7 @@ CompositeQuery( from_clause=LOGICAL_QUERY, selected_columns=[ - SelectedExpression( - "f", FunctionCall("f", "avg", (Column(None, "t", "c"),)) - ) + SelectedExpression("f", FunctionCall("f", "avg", (Column(None, "t", "c"),))) ], ), [ @@ -174,7 +167,7 @@ "FROM", " ['Entity(events)'] AS `ev`", " INNER JOIN", - " ['Entity(groupedmessage)'] AS `gr`", + " ['Entity(profiles)'] AS `gr`", " ON", " ev.group_id", " gr.id", @@ -187,9 +180,7 @@ from_clause=CompositeQuery( from_clause=SIMPLE_SELECT_QUERY, selected_columns=[ - SelectedExpression( - "f", FunctionCall("f", "avg", (Column(None, "t", "c"),)) - ) + SelectedExpression("f", FunctionCall("f", "avg", (Column(None, "t", "c"),))) ], ), selected_columns=[SelectedExpression("tc", Column(None, "t", "c"))], diff --git a/tests/query/joins/equivalence_schema.py b/tests/query/joins/equivalence_schema.py index 65eb5040bd8..d29babe1a5f 100644 --- a/tests/query/joins/equivalence_schema.py +++ b/tests/query/joins/equivalence_schema.py @@ -48,7 +48,7 @@ def __init__(self) -> None: abstract_column_set=EVENTS_SCHEMA, join_relationships={ "grouped": JoinRelationship( - rhs_entity=EntityKey.GROUPEDMESSAGE, + rhs_entity=EntityKey.PROFILES, columns=[("group_id", "id")], join_type=JoinType.INNER, equivalences=[ColumnEquivalence("project_id", "project_id")], @@ -81,7 +81,7 @@ def __init__(self) -> None: ) -class GroupedMessage(FakeEntity): +class Profiles(FakeEntity): def __init__(self) -> None: super().__init__( storages=[], diff --git a/tests/query/joins/join_structures.py b/tests/query/joins/join_structures.py index 09216f2786f..4a0e7755e1b 100644 --- a/tests/query/joins/join_structures.py +++ b/tests/query/joins/join_structures.py @@ -66,7 +66,7 @@ def groups_node( ) -> IndividualNode[Entity]: return build_node( "gr", - Entity(EntityKey.GROUPEDMESSAGE, ColumnSet(GROUPS_SCHEMA.columns)), + Entity(EntityKey.PROFILES, ColumnSet(GROUPS_SCHEMA.columns)), selected_columns, condition, granularity, @@ -111,7 +111,7 @@ def clickhouse_groups_node( ) -> IndividualNode[Table]: return build_clickhouse_node( "gr", - Table("groupedmessage_local", GROUPS_SCHEMA, storage_key=StorageKey("dontmatter")), + Table("profiles_local", GROUPS_SCHEMA, storage_key=StorageKey("dontmatter")), selected_columns, condition, ) diff --git a/tests/query/joins/test_equivalence_adder.py b/tests/query/joins/test_equivalence_adder.py index bfc8d751cd5..2b2be3e801a 100644 --- a/tests/query/joins/test_equivalence_adder.py +++ b/tests/query/joins/test_equivalence_adder.py @@ -31,7 +31,7 @@ EVENTS_SCHEMA, GROUPS_SCHEMA, Events, - GroupedMessage, + Profiles, ) @@ -46,14 +46,12 @@ def test_classify_and_replace() -> None: assert condition.transform( partial(_replace_col, "ev", "project_id", "gr", "project_id") - ) == binary_condition( - ConditionFunctions.EQ, Column(None, "gr", "project_id"), Literal(None, 1) - ) + ) == binary_condition(ConditionFunctions.EQ, Column(None, "gr", "project_id"), Literal(None, 1)) ENTITY_GROUP_JOIN = JoinClause( IndividualNode("ev", EntitySource(EntityKey.EVENTS, EVENTS_SCHEMA, None)), - IndividualNode("gr", EntitySource(EntityKey.GROUPEDMESSAGE, GROUPS_SCHEMA, None)), + IndividualNode("gr", EntitySource(EntityKey.PROFILES, GROUPS_SCHEMA, None)), [ JoinCondition( JoinConditionExpression("ev", "group_id"), @@ -67,19 +65,13 @@ def test_classify_and_replace() -> None: TEST_REPLACEMENT = [ pytest.param( - binary_condition( - ConditionFunctions.EQ, Column(None, "ev", "event_id"), Literal(None, 1) - ), + binary_condition(ConditionFunctions.EQ, Column(None, "ev", "event_id"), Literal(None, 1)), ENTITY_GROUP_JOIN, - binary_condition( - ConditionFunctions.EQ, Column(None, "ev", "event_id"), Literal(None, 1) - ), + binary_condition(ConditionFunctions.EQ, Column(None, "ev", "event_id"), Literal(None, 1)), id="No condition to add", ), pytest.param( - binary_condition( - ConditionFunctions.EQ, Column(None, "ev", "event_id"), Literal(None, 1) - ), + binary_condition(ConditionFunctions.EQ, Column(None, "ev", "event_id"), Literal(None, 1)), JoinClause( IndividualNode("ev", EntitySource(EntityKey.EVENTS, EVENTS_SCHEMA, None)), IndividualNode("ev2", EntitySource(EntityKey.EVENTS, EVENTS_SCHEMA, None)), @@ -109,9 +101,7 @@ def test_classify_and_replace() -> None: id="Self join. Duplicate condition", ), pytest.param( - binary_condition( - ConditionFunctions.EQ, Column(None, "ev", "project_id"), Literal(None, 1) - ), + binary_condition(ConditionFunctions.EQ, Column(None, "ev", "project_id"), Literal(None, 1)), ENTITY_GROUP_JOIN, combine_and_conditions( [ @@ -255,16 +245,14 @@ def test_classify_and_replace() -> None: ] -@pytest.mark.parametrize( - "initial_condition, join_clause, expected_expr", TEST_REPLACEMENT -) +@pytest.mark.parametrize("initial_condition, join_clause, expected_expr", TEST_REPLACEMENT) def test_add_equivalent_condition( initial_condition: Expression, join_clause: JoinClause[EntitySource], expected_expr: Expression, ) -> None: override_entity_map(EntityKey.EVENTS, Events()) - override_entity_map(EntityKey.GROUPEDMESSAGE, GroupedMessage()) + override_entity_map(EntityKey.PROFILES, Profiles()) query = CompositeQuery( from_clause=join_clause, diff --git a/tests/query/joins/test_equivalences.py b/tests/query/joins/test_equivalences.py index f5c63eb4d83..70fd9369bbf 100644 --- a/tests/query/joins/test_equivalences.py +++ b/tests/query/joins/test_equivalences.py @@ -21,16 +21,14 @@ GROUPS_SCHEMA, Events, GroupAssignee, - GroupedMessage, + Profiles, ) TEST_CASES = [ pytest.param( JoinClause( IndividualNode("ev", EntitySource(EntityKey.EVENTS, EVENTS_SCHEMA, None)), - IndividualNode( - "gr", EntitySource(EntityKey.GROUPEDMESSAGE, GROUPS_SCHEMA, None) - ), + IndividualNode("gr", EntitySource(EntityKey.PROFILES, GROUPS_SCHEMA, None)), [ JoinCondition( JoinConditionExpression("ev", "group_id"), @@ -42,15 +40,15 @@ ), { QualifiedCol(EntityKey.EVENTS, "group_id"): { - QualifiedCol(EntityKey.GROUPEDMESSAGE, "id"), + QualifiedCol(EntityKey.PROFILES, "id"), }, - QualifiedCol(EntityKey.GROUPEDMESSAGE, "id"): { + QualifiedCol(EntityKey.PROFILES, "id"): { QualifiedCol(EntityKey.EVENTS, "group_id"), }, QualifiedCol(EntityKey.EVENTS, "project_id"): { - QualifiedCol(EntityKey.GROUPEDMESSAGE, "project_id"), + QualifiedCol(EntityKey.PROFILES, "project_id"), }, - QualifiedCol(EntityKey.GROUPEDMESSAGE, "project_id"): { + QualifiedCol(EntityKey.PROFILES, "project_id"): { QualifiedCol(EntityKey.EVENTS, "project_id"), }, }, @@ -59,12 +57,8 @@ pytest.param( JoinClause( JoinClause( - IndividualNode( - "ev", EntitySource(EntityKey.EVENTS, EVENTS_SCHEMA, None) - ), - IndividualNode( - "as", EntitySource(EntityKey.GROUPASSIGNEE, GROUPS_ASSIGNEE, None) - ), + IndividualNode("ev", EntitySource(EntityKey.EVENTS, EVENTS_SCHEMA, None)), + IndividualNode("as", EntitySource(EntityKey.GROUPASSIGNEE, GROUPS_ASSIGNEE, None)), [ JoinCondition( JoinConditionExpression("ev", "group_id"), @@ -74,9 +68,7 @@ JoinType.INNER, None, ), - IndividualNode( - "gr", EntitySource(EntityKey.GROUPEDMESSAGE, GROUPS_SCHEMA, None) - ), + IndividualNode("gr", EntitySource(EntityKey.PROFILES, GROUPS_SCHEMA, None)), [ JoinCondition( JoinConditionExpression("ev", "group_id"), @@ -88,27 +80,27 @@ ), { QualifiedCol(EntityKey.EVENTS, "group_id"): { - QualifiedCol(EntityKey.GROUPEDMESSAGE, "id"), + QualifiedCol(EntityKey.PROFILES, "id"), QualifiedCol(EntityKey.GROUPASSIGNEE, "group_id"), }, - QualifiedCol(EntityKey.GROUPEDMESSAGE, "id"): { + QualifiedCol(EntityKey.PROFILES, "id"): { QualifiedCol(EntityKey.EVENTS, "group_id"), QualifiedCol(EntityKey.GROUPASSIGNEE, "group_id"), }, QualifiedCol(EntityKey.GROUPASSIGNEE, "group_id"): { QualifiedCol(EntityKey.EVENTS, "group_id"), - QualifiedCol(EntityKey.GROUPEDMESSAGE, "id"), + QualifiedCol(EntityKey.PROFILES, "id"), }, QualifiedCol(EntityKey.EVENTS, "project_id"): { - QualifiedCol(EntityKey.GROUPEDMESSAGE, "project_id"), + QualifiedCol(EntityKey.PROFILES, "project_id"), QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"), }, - QualifiedCol(EntityKey.GROUPEDMESSAGE, "project_id"): { + QualifiedCol(EntityKey.PROFILES, "project_id"): { QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"), QualifiedCol(EntityKey.EVENTS, "project_id"), }, QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"): { - QualifiedCol(EntityKey.GROUPEDMESSAGE, "project_id"), + QualifiedCol(EntityKey.PROFILES, "project_id"), QualifiedCol(EntityKey.EVENTS, "project_id"), }, }, @@ -117,12 +109,8 @@ pytest.param( JoinClause( JoinClause( - IndividualNode( - "ev", EntitySource(EntityKey.EVENTS, EVENTS_SCHEMA, None) - ), - IndividualNode( - "gr", EntitySource(EntityKey.GROUPEDMESSAGE, GROUPS_SCHEMA, None) - ), + IndividualNode("ev", EntitySource(EntityKey.EVENTS, EVENTS_SCHEMA, None)), + IndividualNode("gr", EntitySource(EntityKey.PROFILES, GROUPS_SCHEMA, None)), [ JoinCondition( JoinConditionExpression("ev", "group_id"), @@ -132,9 +120,7 @@ JoinType.INNER, None, ), - IndividualNode( - "as", EntitySource(EntityKey.GROUPASSIGNEE, GROUPS_ASSIGNEE, None) - ), + IndividualNode("as", EntitySource(EntityKey.GROUPASSIGNEE, GROUPS_ASSIGNEE, None)), [ JoinCondition( JoinConditionExpression("gr", "user_id"), @@ -146,27 +132,27 @@ ), { QualifiedCol(EntityKey.EVENTS, "group_id"): { - QualifiedCol(EntityKey.GROUPEDMESSAGE, "id"), + QualifiedCol(EntityKey.PROFILES, "id"), }, - QualifiedCol(EntityKey.GROUPEDMESSAGE, "id"): { + QualifiedCol(EntityKey.PROFILES, "id"): { QualifiedCol(EntityKey.EVENTS, "group_id"), }, - QualifiedCol(EntityKey.GROUPEDMESSAGE, "user_id"): { + QualifiedCol(EntityKey.PROFILES, "user_id"): { QualifiedCol(EntityKey.GROUPASSIGNEE, "user_id"), }, QualifiedCol(EntityKey.GROUPASSIGNEE, "user_id"): { - QualifiedCol(EntityKey.GROUPEDMESSAGE, "user_id"), + QualifiedCol(EntityKey.PROFILES, "user_id"), }, QualifiedCol(EntityKey.EVENTS, "project_id"): { - QualifiedCol(EntityKey.GROUPEDMESSAGE, "project_id"), + QualifiedCol(EntityKey.PROFILES, "project_id"), QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"), }, - QualifiedCol(EntityKey.GROUPEDMESSAGE, "project_id"): { + QualifiedCol(EntityKey.PROFILES, "project_id"): { QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"), QualifiedCol(EntityKey.EVENTS, "project_id"), }, QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"): { - QualifiedCol(EntityKey.GROUPEDMESSAGE, "project_id"), + QualifiedCol(EntityKey.PROFILES, "project_id"), QualifiedCol(EntityKey.EVENTS, "project_id"), }, }, @@ -176,11 +162,9 @@ @pytest.mark.parametrize("join, graph", TEST_CASES) -def test_find_equivalences( - join: JoinClause[EntitySource], graph: EquivalenceGraph -) -> None: +def test_find_equivalences(join: JoinClause[EntitySource], graph: EquivalenceGraph) -> None: override_entity_map(EntityKey.EVENTS, Events()) - override_entity_map(EntityKey.GROUPEDMESSAGE, GroupedMessage()) + override_entity_map(EntityKey.PROFILES, Profiles()) override_entity_map(EntityKey.GROUPASSIGNEE, GroupAssignee()) assert get_equivalent_columns(join) == graph diff --git a/tests/query/joins/test_subqueries.py b/tests/query/joins/test_subqueries.py index 97836eafc34..2cd86d82552 100644 --- a/tests/query/joins/test_subqueries.py +++ b/tests/query/joins/test_subqueries.py @@ -29,7 +29,7 @@ GROUPS_SCHEMA, Events, GroupAssignee, - GroupedMessage, + Profiles, ) from tests.query.joins.join_structures import ( events_groups_join, @@ -44,9 +44,7 @@ ), right_node=IndividualNode( alias="gr", - data_source=Entity( - EntityKey.GROUPEDMESSAGE, ColumnSet(GROUPS_SCHEMA.columns), None - ), + data_source=Entity(EntityKey.PROFILES, ColumnSet(GROUPS_SCHEMA.columns), None), ), keys=[ JoinCondition( @@ -116,9 +114,7 @@ "_snuba_group_id", Column("_snuba_group_id", None, "id"), ), - SelectedExpression( - "_snuba_id", Column("_snuba_id", None, "id") - ), + SelectedExpression("_snuba_id", Column("_snuba_id", None, "id")), ], ), ), @@ -131,9 +127,7 @@ (Column("_snuba_ev.event_id", "ev", "_snuba_ev.event_id"),), ), ), - SelectedExpression( - "group_id", Column("_snuba_group_id", "gr", "_snuba_group_id") - ), + SelectedExpression("group_id", Column("_snuba_group_id", "gr", "_snuba_group_id")), ], ), id="Basic join with select", @@ -166,9 +160,7 @@ "_snuba_group_id", Column("_snuba_group_id", None, "id"), ), - SelectedExpression( - "_snuba_id", Column("_snuba_id", None, "id") - ), + SelectedExpression("_snuba_id", Column("_snuba_id", None, "id")), ], condition=binary_condition( ConditionFunctions.EQ, @@ -178,9 +170,7 @@ ), ), selected_columns=[ - SelectedExpression( - "group_id", Column("_snuba_group_id", "gr", "_snuba_group_id") - ), + SelectedExpression("group_id", Column("_snuba_group_id", "gr", "_snuba_group_id")), ], ), id="Query with condition", @@ -201,9 +191,7 @@ binary_condition( ConditionFunctions.EQ, Column("_snuba_group_id", "gr", "id"), - FunctionCall( - None, "f", (Column("_snuba_e_group_id", "ev", "group_id"),) - ), + FunctionCall(None, "f", (Column("_snuba_e_group_id", "ev", "group_id"),)), ), ), ), @@ -231,9 +219,7 @@ "_snuba_group_id", Column("_snuba_group_id", None, "id"), ), - SelectedExpression( - "_snuba_id", Column("_snuba_id", None, "id") - ), + SelectedExpression("_snuba_id", Column("_snuba_id", None, "id")), ], binary_condition( ConditionFunctions.EQ, @@ -243,9 +229,7 @@ ), ), selected_columns=[ - SelectedExpression( - "group_id", Column("_snuba_group_id", "gr", "_snuba_group_id") - ), + SelectedExpression("group_id", Column("_snuba_group_id", "gr", "_snuba_group_id")), ], condition=binary_condition( ConditionFunctions.EQ, @@ -291,9 +275,7 @@ "_snuba_group_id", Column("_snuba_group_id", None, "id"), ), - SelectedExpression( - "_snuba_id", Column("_snuba_id", None, "id") - ), + SelectedExpression("_snuba_id", Column("_snuba_id", None, "id")), ], ), ), @@ -318,9 +300,7 @@ join_type=JoinType.INNER, ), selected_columns=[ - SelectedExpression( - "group_id", Column("_snuba_group_id", "gr", "_snuba_group_id") - ), + SelectedExpression("group_id", Column("_snuba_group_id", "gr", "_snuba_group_id")), ], ), id="Multi entity join", @@ -413,9 +393,7 @@ None, "greater", ( - FunctionCall( - None, "min", (Column("_snuba_gen_2", "ev", "_snuba_gen_2"),) - ), + FunctionCall(None, "min", (Column("_snuba_gen_2", "ev", "_snuba_gen_2"),)), Literal(None, "sometime"), ), ), @@ -460,9 +438,7 @@ from_clause=events_groups_join( events_node( [ - SelectedExpression( - "_snuba_a_col", Column("_snuba_a_col", None, "column") - ), + SelectedExpression("_snuba_a_col", Column("_snuba_a_col", None, "column")), SelectedExpression( "_snuba_another_func", FunctionCall( @@ -483,16 +459,12 @@ "_snuba_another_col", Column("_snuba_another_col", None, "another_column"), ), - SelectedExpression( - "_snuba_id", Column("_snuba_id", None, "id") - ), + SelectedExpression("_snuba_id", Column("_snuba_id", None, "id")), ] ), ), selected_columns=[ - SelectedExpression( - "a_col", Column("_snuba_a_col", "ev", "_snuba_a_col") - ), + SelectedExpression("a_col", Column("_snuba_a_col", "ev", "_snuba_a_col")), SelectedExpression( "a_func", FunctionCall( @@ -522,22 +494,16 @@ def test_subquery_generator( processed_query: CompositeQuery[Entity], ) -> None: override_entity_map(EntityKey.EVENTS, Events()) - override_entity_map(EntityKey.GROUPEDMESSAGE, GroupedMessage()) + override_entity_map(EntityKey.PROFILES, Profiles()) override_entity_map(EntityKey.GROUPASSIGNEE, GroupAssignee()) generate_subqueries(original_query) - original_map = cast( - JoinClause[Entity], original_query.get_from_clause() - ).get_alias_node_map() - processed_map = cast( - JoinClause[Entity], processed_query.get_from_clause() - ).get_alias_node_map() + original_map = cast(JoinClause[Entity], original_query.get_from_clause()).get_alias_node_map() + processed_map = cast(JoinClause[Entity], processed_query.get_from_clause()).get_alias_node_map() for k, node in original_map.items(): - report = cast(LogicalQuery, node.data_source).equals( - processed_map[k].data_source - ) + report = cast(LogicalQuery, node.data_source).equals(processed_map[k].data_source) assert report[0], f"Failed equality {k}: {report[1]}" report = original_query.equals(processed_query) diff --git a/tests/query/snql/test_invalid_queries.py b/tests/query/snql/test_invalid_queries.py index dda2942298c..b7365ead494 100644 --- a/tests/query/snql/test_invalid_queries.py +++ b/tests/query/snql/test_invalid_queries.py @@ -99,7 +99,7 @@ def test_failures(query_body: str, message: str) -> None: mapping = { "contains": (EntityKey.TRANSACTIONS, "event_id"), "assigned": (EntityKey.GROUPASSIGNEE, "group_id"), - "bookmark": (EntityKey.GROUPEDMESSAGE, "first_release_id"), + "bookmark": (EntityKey.PROFILES, "profile_id"), } def events_mock(relationship: str) -> Optional[JoinRelationship]: diff --git a/tests/query/snql/test_query.py b/tests/query/snql/test_query.py index e33ea66b88b..19f30e76e8e 100644 --- a/tests/query/snql/test_query.py +++ b/tests/query/snql/test_query.py @@ -1028,7 +1028,7 @@ def build_cond(tn: str) -> str: f"""MATCH (e: events) -[contains]-> (t: transactions), (e: events) -[assigned]-> (ga: groupassignee), - (e: events) -[bookmark]-> (gm: groupedmessage), + (e: events) -[bookmark]-> (gm: profiles), (e: events) -[activity]-> (se: metrics_sets) SELECT 4-5, e.event_id, t.event_id, ga.offset, gm.offset, se.metric_id WHERE {build_cond("e")} AND {build_cond("t")} @@ -1064,14 +1064,14 @@ def build_cond(tn: str) -> str: right_node=IndividualNode( "gm", QueryEntity( - EntityKey.GROUPEDMESSAGE, - get_entity(EntityKey.GROUPEDMESSAGE).get_data_model(), + EntityKey.PROFILES, + get_entity(EntityKey.PROFILES).get_data_model(), ), ), keys=[ JoinCondition( JoinConditionExpression("e", "event_id"), - JoinConditionExpression("gm", "first_release_id"), + JoinConditionExpression("gm", "profile_id"), ) ], join_type=JoinType.INNER, @@ -1813,7 +1813,7 @@ def test_format_expressions(query_body: str, expected_query: LogicalQuery) -> No mapping = { "contains": (EntityKey.TRANSACTIONS, "event_id"), "assigned": (EntityKey.GROUPASSIGNEE, "group_id"), - "bookmark": (EntityKey.GROUPEDMESSAGE, "first_release_id"), + "bookmark": (EntityKey.PROFILES, "profile_id"), "activity": (EntityKey.METRICS_SETS, "org_id"), } diff --git a/tests/query/test_nested.py b/tests/query/test_nested.py index 1f56a06488c..e62152fc946 100644 --- a/tests/query/test_nested.py +++ b/tests/query/test_nested.py @@ -53,7 +53,7 @@ def test_join_query() -> None: groups_query = LogicalQuery( Entity( - EntityKey.GROUPEDMESSAGE, + EntityKey.PROFILES, ColumnSet([("id", UInt(32)), ("message", String())]), ), selected_columns=[SelectedExpression("group_id", Column("group_id", None, "id"))], diff --git a/tests/test_snql_api.py b/tests/test_snql_api.py index 46f6cf904e8..ea51c15a500 100644 --- a/tests/test_snql_api.py +++ b/tests/test_snql_api.py @@ -211,7 +211,7 @@ def test_join_query(self) -> None: "/events/snql", data=json.dumps( { - "query": f"""MATCH (e: events) -[grouped]-> (gm: groupedmessage) + "query": f"""MATCH (e: events) -[grouped]-> (gm: profiles) SELECT e.group_id, gm.status, avg(e.retention_days) AS avg BY e.group_id, gm.status WHERE e.project_id = {self.project_id} AND gm.project_id = {self.project_id} @@ -630,7 +630,7 @@ def test_multi_table_join(self) -> None: data=json.dumps( { "query": f""" - MATCH (e: events) -[grouped]-> (g: groupedmessage), + MATCH (e: events) -[grouped]-> (g: profiles), (e: events) -[assigned]-> (a: groupassignee) SELECT e.message, e.tags[b], a.user_id, g.last_seen WHERE e.project_id = {self.project_id} @@ -654,7 +654,7 @@ def test_complex_table_join(self) -> None: data=json.dumps( { "query": f""" - MATCH (e: events) -[grouped]-> (g: groupedmessage) + MATCH (e: events) -[grouped]-> (g: profiles) SELECT g.id, toUInt64(plus(multiply(log(count(e.group_id)), 600), multiply(toUInt64(toUInt64(max(e.timestamp))), 1000))) AS score BY g.id WHERE e.project_id IN array({self.project_id}) AND e.timestamp >= toDateTime('2021-07-04T01:09:08.188427') AND e.timestamp < toDateTime('2021-08-06T01:10:09.411889') AND g.status IN array(0) ORDER BY toUInt64(plus(multiply(log(count(e.group_id)), 600), multiply(toUInt64(toUInt64(max(e.timestamp))), 1000))) DESC LIMIT 101 @@ -1015,7 +1015,7 @@ def test_valid_columns_composite_query(self) -> None: "/events/snql", data=json.dumps( { - "query": f"""MATCH (e: events) -[grouped]-> (gm: groupedmessage) + "query": f"""MATCH (e: events) -[grouped]-> (gm: profiles) SELECT e.group_id, gm.status, avg(e.retention_days) AS avg BY e.group_id, gm.status WHERE e.project_id = {self.project_id} AND gm.project_id = {self.project_id} @@ -1028,7 +1028,7 @@ def test_valid_columns_composite_query(self) -> None: ) assert response.status_code == 200 - MATCH = "MATCH (e: events) -[grouped]-> (gm: groupedmessage)" + MATCH = "MATCH (e: events) -[grouped]-> (gm: profiles)" SELECT = "SELECT e.group_id, gm.status, avg(e.retention_days) AS avg BY e.group_id, gm.status" WHERE = "WHERE e.project_id = 1 AND gm.project_id = 1" TIMESTAMPS = ( @@ -1063,7 +1063,7 @@ def test_valid_columns_composite_query(self) -> None: {TIMESTAMPS} """, 400, - "validation failed for entity groupedmessage: Query column 'fsdfsd' does not exist", + "validation failed for entity profiles: Query column 'fsdfsd' does not exist", id="Invalid second Select column", ), pytest.param( @@ -1073,7 +1073,7 @@ def test_valid_columns_composite_query(self) -> None: {TIMESTAMPS} """, 400, - "validation failed for entity groupedmessage: Query column 'fsdfsd' does not exist", + "validation failed for entity profiles: Query column 'fsdfsd' does not exist", id="Invalid By column", ), pytest.param( @@ -1084,7 +1084,7 @@ def test_valid_columns_composite_query(self) -> None: {TIMESTAMPS} """, 400, - "validation failed for entity groupedmessage: Query column 'fsdfsd' does not exist", + "validation failed for entity profiles: Query column 'fsdfsd' does not exist", id="Invalid Where column", ), pytest.param( @@ -1117,10 +1117,10 @@ def test_invalid_columns_composite_query( self, query: str, response_code: int, error_message: str ) -> None: override_entity_column_validator(EntityKey.EVENTS, ColumnValidationMode.ERROR) - override_entity_column_validator(EntityKey.GROUPEDMESSAGE, ColumnValidationMode.ERROR) + override_entity_column_validator(EntityKey.PROFILES, ColumnValidationMode.ERROR) response = self.post("/events/snql", data=json.dumps({"query": query})) override_entity_column_validator(EntityKey.EVENTS, ColumnValidationMode.WARN) - override_entity_column_validator(EntityKey.GROUPEDMESSAGE, ColumnValidationMode.WARN) + override_entity_column_validator(EntityKey.PROFILES, ColumnValidationMode.WARN) assert response.status_code == response_code assert json.loads(response.data)["error"]["message"] == error_message diff --git a/tests/test_snql_sdk_api.py b/tests/test_snql_sdk_api.py index 656911f3be9..05cdae63339 100644 --- a/tests/test_snql_sdk_api.py +++ b/tests/test_snql_sdk_api.py @@ -97,7 +97,7 @@ def test_simple_query(self) -> None: def test_join_query(self) -> None: ev = Entity("events", "ev") - gm = Entity("groupedmessage", "gm") + gm = Entity("profiles", "gm") join = Join([Relationship(ev, "grouped", gm)]) query = ( Query(join) @@ -249,7 +249,7 @@ def test_tags_in_groupby(self) -> None: def test_array_condition_unpack_in_join_query(self) -> None: ev = Entity("events", "ev") - gm = Entity("groupedmessage", "gm") + gm = Entity("profiles", "gm") join = Join([Relationship(ev, "grouped", gm)]) query = ( Query(join) diff --git a/tests/test_writer.py b/tests/test_writer.py index b0e95fb36b7..bc26dc1eebf 100644 --- a/tests/test_writer.py +++ b/tests/test_writer.py @@ -41,37 +41,34 @@ def test_error_handling(self) -> None: assert error.value.row == 2 -DATA = """project_id,id,status,last_seen,first_seen,active_at,first_release_id -2,1409156,0,2021-03-13 00:43:02,2021-03-13 00:43:02,2021-03-13 00:43:02, -2,1409157,0,2021-03-13 00:43:02,2021-03-13 00:43:02,2021-03-13 00:43:02, +DATA = """organization_id,project_id,received,platform +1,2,2021-03-13 00:43:02,python +1,2,2021-03-13 00:43:02,python """ class FakeQuery(FormattedQuery): def get_sql(self, format: Optional[str] = None) -> str: - return "SELECT count() FROM groupedmessage_local;" + return "SELECT count() FROM profiles_local;" @pytest.mark.clickhouse_db def test_gzip_load() -> None: content = gzip.compress(DATA.encode("utf-8")) - entity = get_entity(EntityKey.GROUPEDMESSAGE) + entity = get_entity(EntityKey.PROFILES) metrics = DummyMetricsBackend(strict=True) writer = enforce_table_writer(entity).get_bulk_writer( metrics, "gzip", [ + "organization_id", "project_id", - "id", - "status", - "last_seen", - "first_seen", - "active_at", - "first_release_id", + "received", + "platform", ], options=None, - table_name="groupedmessage_local", + table_name="profiles_local", ) writer.write([content]) From 46bc26fbcbb72353097e7ec69d63f7e80cc4b103 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 22 Jan 2026 15:02:25 -0800 Subject: [PATCH 08/10] ref: Remove groupassignee entity and assigned join relationships Remove the `assigned` join relationship from events and discover_events entities as part of removing the groupassignee CDC entity. Update test files to remove all references to groupassignee and EntityKey.GROUPASSIGNEE. Co-Authored-By: Claude --- .../discover/entities/discover_events.yaml | 8 +- .../configuration/events/entities/events.yaml | 6 - tests/clickhouse/test_query_format.py | 208 --------------- .../entity_join_relationships.yaml | 15 +- .../configuration/test_entity_loader.py | 4 +- .../entities/test_entity_describer.py | 45 ---- tests/datasets/test_dataset_factory.py | 2 - tests/datasets/test_entity_factory.py | 1 - tests/query/data_source/test_join.py | 53 +--- tests/query/joins/equivalence_schema.py | 53 ---- tests/query/joins/join_structures.py | 13 - tests/query/joins/test_equivalences.py | 107 -------- tests/query/joins/test_semi_join.py | 92 ------- tests/query/joins/test_subqueries.py | 69 ----- tests/query/snql/test_invalid_queries.py | 1 - tests/query/snql/test_query.py | 240 ------------------ tests/test_snql_api.py | 45 ---- 17 files changed, 11 insertions(+), 951 deletions(-) delete mode 100644 tests/datasets/entities/test_entity_describer.py diff --git a/snuba/datasets/configuration/discover/entities/discover_events.yaml b/snuba/datasets/configuration/discover/entities/discover_events.yaml index 9fc05f4cf4e..1cb37c88b2d 100644 --- a/snuba/datasets/configuration/discover/entities/discover_events.yaml +++ b/snuba/datasets/configuration/discover/entities/discover_events.yaml @@ -544,10 +544,4 @@ subscription_validators: - orderby required_time_column: timestamp -join_relationships: - assigned: - rhs_entity: groupassignee - join_type: inner - columns: - - [project_id, project_id] - - [group_id, group_id] +join_relationships: {} diff --git a/snuba/datasets/configuration/events/entities/events.yaml b/snuba/datasets/configuration/events/entities/events.yaml index 4a9bab9726c..a17ac3ef485 100644 --- a/snuba/datasets/configuration/events/entities/events.yaml +++ b/snuba/datasets/configuration/events/entities/events.yaml @@ -541,12 +541,6 @@ subscription_validators: allows_group_by_without_condition: true join_relationships: - assigned: - rhs_entity: groupassignee - join_type: inner - columns: - - [project_id, project_id] - - [group_id, group_id] attributes: rhs_entity: group_attributes join_type: left diff --git a/tests/clickhouse/test_query_format.py b/tests/clickhouse/test_query_format.py index 17c826d4cf4..5335bc2077e 100644 --- a/tests/clickhouse/test_query_format.py +++ b/tests/clickhouse/test_query_format.py @@ -48,8 +48,6 @@ ("message", String()), ] ) -GROUPS_ASSIGNEE = ColumnSet([("id", UInt(32)), ("user", String())]) - node_err = IndividualNode( alias="err", data_source=Table("errors_local", ERRORS_SCHEMA, storage_key=StorageKey("errors")), @@ -58,12 +56,6 @@ alias="groups", data_source=Table("profiles_local", GROUPS_SCHEMA, storage_key=StorageKey("groups")), ) -node_assignee = IndividualNode( - alias="assignee", - data_source=Table( - "groupassignee_local", GROUPS_ASSIGNEE, storage_key=StorageKey("group_assignee") - ), -) test_cases = [ pytest.param( @@ -402,159 +394,6 @@ ), id="Simple join", ), - pytest.param( - CompositeQuery( - from_clause=JoinClause( - left_node=JoinClause( - left_node=IndividualNode( - alias="err", - data_source=Query( - from_clause=Table( - "errors_local", - ERRORS_SCHEMA, - storage_key=StorageKey("dontmatter"), - ), - selected_columns=[ - SelectedExpression( - "error_id", Column("error_id", None, "event_id") - ), - SelectedExpression( - "group_id", Column("group_id", None, "group_id") - ), - ], - condition=binary_condition( - "eq", - Column(None, None, "project_id"), - Literal(None, 1), - ), - ), - ), - right_node=IndividualNode( - alias="groups", - data_source=Query( - from_clause=Table( - "profiles_local", - GROUPS_SCHEMA, - storage_key=StorageKey("dontmatter"), - ), - selected_columns=[ - SelectedExpression("id", Column("id", None, "id")), - SelectedExpression("message", Column("message", None, "message")), - ], - condition=binary_condition( - "eq", - Column(None, None, "project_id"), - Literal(None, 1), - ), - ), - ), - keys=[ - JoinCondition( - left=JoinConditionExpression("err", "group_id"), - right=JoinConditionExpression("groups", "id"), - ) - ], - join_type=JoinType.INNER, - ), - right_node=IndividualNode( - alias="assignee", - data_source=Query( - from_clause=Table( - "groupassignee_local", - GROUPS_ASSIGNEE, - storage_key=StorageKey("dontmatter"), - ), - selected_columns=[ - SelectedExpression("group_id", Column("group_id", None, "group_id")), - ], - condition=binary_condition( - "eq", - Column(None, None, "user"), - Literal(None, "me"), - ), - ), - ), - keys=[ - JoinCondition( - left=JoinConditionExpression("err", "group_id"), - right=JoinConditionExpression("assignee", "group_id"), - ) - ], - join_type=JoinType.INNER, - ), - selected_columns=[ - SelectedExpression("group_id", Column("group_id", "err", "group_id")), - SelectedExpression("events", FunctionCall("events", "count", tuple())), - ], - groupby=[Column(None, "groups", "id")], - ), - [ - "SELECT (err.group_id AS group_id), (count() AS events)", - [ - "FROM", - [ - [ - [ - [ - "SELECT (event_id AS error_id), group_id", - ["FROM", "errors_local"], - "WHERE eq(project_id, 1)", - ], - "err", - ], - "INNER JOIN", - [ - [ - "SELECT id, message", - ["FROM", "profiles_local"], - "WHERE eq(project_id, 1)", - ], - "groups", - ], - "ON", - ["err.group_id=groups.id"], - ], - "INNER JOIN", - [ - [ - "SELECT group_id", - ["FROM", "groupassignee_local"], - "WHERE eq(user, 'me')", - ], - "assignee", - ], - "ON", - ["err.group_id=assignee.group_id"], - ], - ], - "GROUP BY groups.id", - ], - ( - "SELECT (err.group_id AS group_id), (count() AS events) " - "FROM " - "(SELECT (event_id AS error_id), group_id FROM errors_local WHERE eq(project_id, 1)) err " - "INNER JOIN " - "(SELECT id, message FROM profiles_local WHERE eq(project_id, 1)) groups " - "ON err.group_id=groups.id " - "INNER JOIN " - "(SELECT group_id FROM groupassignee_local WHERE eq(user, 'me')) assignee " - "ON err.group_id=assignee.group_id " - "GROUP BY groups.id" - ), - ( - "SELECT (err.group_id AS group_id), (count() AS events) " - "FROM " - "(SELECT (event_id AS error_id), group_id FROM errors_local WHERE eq(project_id, -1337)) err " - "INNER JOIN " - "(SELECT id, message FROM profiles_local WHERE eq(project_id, -1337)) groups " - "ON err.group_id=groups.id " - "INNER JOIN " - "(SELECT group_id FROM groupassignee_local WHERE eq(user, '$S')) assignee " - "ON err.group_id=assignee.group_id " - "GROUP BY groups.id" - ), - id="Join of multiple subqueries", - ), pytest.param( Query( Table("my_table", ColumnSet([]), storage_key=StorageKey("dontmatter")), @@ -705,53 +544,6 @@ def test_delete_query() -> None: ), id="Simple join", ), - pytest.param( - JoinClause( - left_node=JoinClause( - left_node=node_err, - right_node=node_group, - keys=[ - JoinCondition( - left=JoinConditionExpression("err", "group_id"), - right=JoinConditionExpression("groups", "id"), - ) - ], - join_type=JoinType.INNER, - join_modifier=JoinModifier.SEMI, - ), - right_node=node_assignee, - keys=[ - JoinCondition( - left=JoinConditionExpression("err", "group_id"), - right=JoinConditionExpression("assignee", "id"), - ) - ], - join_type=JoinType.INNER, - ), - SequenceNode( - [ - SequenceNode( - [ - PaddingNode(None, StringNode("errors_local"), "err"), - StringNode("SEMI INNER JOIN"), - PaddingNode(None, StringNode("profiles_local"), "groups"), - StringNode("ON"), - SequenceNode([StringNode("err.group_id=groups.id")], " AND "), - ] - ), - StringNode("INNER JOIN"), - PaddingNode(None, StringNode("groupassignee_local"), "assignee"), - StringNode("ON"), - SequenceNode([StringNode("err.group_id=assignee.id")], " AND "), - ] - ), - ( - "errors_local err SEMI INNER JOIN profiles_local groups " - "ON err.group_id=groups.id INNER JOIN groupassignee_local assignee " - "ON err.group_id=assignee.id" - ), - id="Complex join", - ), ] diff --git a/tests/datasets/configuration/entity_join_relationships.yaml b/tests/datasets/configuration/entity_join_relationships.yaml index d8aceb3160b..32b6abc4593 100644 --- a/tests/datasets/configuration/entity_join_relationships.yaml +++ b/tests/datasets/configuration/entity_join_relationships.yaml @@ -1,20 +1,19 @@ version: v1 kind: entity -name: groupassignee_test +name: profiles_test schema: [ { name: offset, type: UInt, args: { size: 64 } }, - { name: record_deleted, type: UInt, args: { size: 8 } }, { name: project_id, type: UInt, args: { size: 64 } }, - { name: group_id, type: UInt, args: { size: 64 } }, - { name: date_added, type: DateTime, args: { schema_modifiers: [nullable] } }, - { name: user_id, type: UInt, args: { size: 64, schema_modifiers: [nullable] } }, - { name: team_id, type: UInt, args: { size: 64, schema_modifiers: [nullable] } }, + { name: profile_id, type: UUID }, + { name: organization_id, type: UInt, args: { size: 64 } }, + { name: transaction_id, type: UUID }, + { name: received, type: DateTime }, ] storages: - - storage: groupassignees + - storage: profiles is_writable: true storage_selector: @@ -29,7 +28,7 @@ join_relationships: join_type: left columns: - [project_id, project_id] - - [group_id, group_id] + - [profile_id, profile_id] equivalences: - [offset, offset] diff --git a/tests/datasets/configuration/test_entity_loader.py b/tests/datasets/configuration/test_entity_loader.py index e0e9973e3e6..53a5db0d9d5 100644 --- a/tests/datasets/configuration/test_entity_loader.py +++ b/tests/datasets/configuration/test_entity_loader.py @@ -109,8 +109,8 @@ def test_entity_loader_join_relationships(self) -> None: assert len(rel.columns) == 2 assert rel.columns[0][0] == "project_id" assert rel.columns[0][1] == "project_id" - assert rel.columns[1][0] == "group_id" - assert rel.columns[1][1] == "group_id" + assert rel.columns[1][0] == "profile_id" + assert rel.columns[1][1] == "profile_id" assert len(rel.equivalences) == 1 assert rel.equivalences[0][0] == "offset" assert rel.equivalences[0][1] == "offset" diff --git a/tests/datasets/entities/test_entity_describer.py b/tests/datasets/entities/test_entity_describer.py deleted file mode 100644 index b243adde4ea..00000000000 --- a/tests/datasets/entities/test_entity_describer.py +++ /dev/null @@ -1,45 +0,0 @@ -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity -from snuba.utils.describer import Description, Property - - -def test_entity_describer() -> None: - entity = get_entity(EntityKey.GROUPASSIGNEE) - description = entity.describe() - - assert description == Description( - header=None, - content=[ - Description( - header="Entity schema", - content=[ - "offset UInt64", - "record_deleted UInt8", - "project_id UInt64", - "group_id UInt64", - "date_added Nullable(DateTime)", - "user_id Nullable(UInt64)", - "team_id Nullable(UInt64)", - ], - ), - Description( - header="Relationships", - content=[ - Description( - header="owns", - content=[ - Property("Destination", "events"), - Property("Type", "LEFT"), - Description( - header="Join keys", - content=[ - "project_id = LEFT.project_id", - "group_id = LEFT.group_id", - ], - ), - ], - ) - ], - ), - ], - ) diff --git a/tests/datasets/test_dataset_factory.py b/tests/datasets/test_dataset_factory.py index 3d7988a5642..0a57f1fab13 100644 --- a/tests/datasets/test_dataset_factory.py +++ b/tests/datasets/test_dataset_factory.py @@ -19,7 +19,6 @@ def test_get_dataset() -> None: "discover", "events", "events_analytics_platform", - "groupassignee", "metrics", "outcomes", "outcomes_raw", @@ -63,7 +62,6 @@ def test_all_names() -> None: "discover", "events", "events_analytics_platform", - "groupassignee", "metrics", "outcomes", "outcomes_raw", diff --git a/tests/datasets/test_entity_factory.py b/tests/datasets/test_entity_factory.py index c08d8bcc033..c638d4c2e62 100644 --- a/tests/datasets/test_entity_factory.py +++ b/tests/datasets/test_entity_factory.py @@ -16,7 +16,6 @@ EntityKey.EVENTS, EntityKey.SPANS_NUM_ATTRS, EntityKey.SPANS_STR_ATTRS, - EntityKey.GROUPASSIGNEE, EntityKey.OUTCOMES, EntityKey.OUTCOMES_RAW, EntityKey.SEARCH_ISSUES, diff --git a/tests/query/data_source/test_join.py b/tests/query/data_source/test_join.py index f751539c778..2664ba30488 100644 --- a/tests/query/data_source/test_join.py +++ b/tests/query/data_source/test_join.py @@ -1,6 +1,5 @@ -from snuba.clickhouse.columns import UUID, Any, ColumnSet, String, UInt +from snuba.clickhouse.columns import UUID, ColumnSet, String, UInt from snuba.datasets.entities.entity_key import EntityKey -from snuba.query import SelectedExpression from snuba.query.data_source.join import ( IndividualNode, JoinClause, @@ -10,15 +9,11 @@ JoinType, ) from snuba.query.data_source.simple import Entity -from snuba.query.expressions import Column -from snuba.query.logical import Query ERRORS_SCHEMA = ColumnSet([("event_id", UUID()), ("message", String()), ("group_id", UInt(32))]) GROUPS_SCHEMA = ColumnSet([("id", UInt(32)), ("message", String())]) -GROUPS_ASSIGNEE = ColumnSet([("id", UInt(32)), ("user", String())]) - def test_entity_node() -> None: e = Entity(key=EntityKey.EVENTS, schema=ERRORS_SCHEMA) @@ -54,49 +49,3 @@ def test_simple_join() -> None: assert "err.event_id" in joined_cols assert "groups.id" in joined_cols assert "groups.message" in joined_cols - - -def test_complex_joins() -> None: - e = Entity(key=EntityKey.EVENTS, schema=ERRORS_SCHEMA) - node_err = IndividualNode(alias="err", data_source=e) - - g = Entity(key=EntityKey.PROFILES, schema=GROUPS_SCHEMA) - node_group = IndividualNode(alias="groups", data_source=g) - - a = Entity(key=EntityKey.GROUPASSIGNEE, schema=GROUPS_ASSIGNEE) - query = Query( - from_clause=a, - selected_columns=[ - SelectedExpression("id", Column("id", None, "id")), - SelectedExpression("assigned_user", Column("assigned_user", None, "user")), - ], - ) - node_query = IndividualNode(alias="assignee", data_source=query) - - join = JoinClause( - left_node=JoinClause( - left_node=node_err, - right_node=node_group, - keys=[ - JoinCondition( - left=JoinConditionExpression("err", "group_id"), - right=JoinConditionExpression("groups", "id"), - ) - ], - join_type=JoinType.INNER, - ), - right_node=node_query, - keys=[ - JoinCondition( - left=JoinConditionExpression("err", "group_id"), - right=JoinConditionExpression("assignee", "id"), - ) - ], - join_type=JoinType.INNER, - ) - - assert join.get_column_sets() == { - "err": ERRORS_SCHEMA, - "assignee": ColumnSet([("id", Any()), ("assigned_user", Any())]), - "groups": GROUPS_SCHEMA, - } diff --git a/tests/query/joins/equivalence_schema.py b/tests/query/joins/equivalence_schema.py index d29babe1a5f..2b89bfa3409 100644 --- a/tests/query/joins/equivalence_schema.py +++ b/tests/query/joins/equivalence_schema.py @@ -26,14 +26,6 @@ ] ) -GROUPS_ASSIGNEE = ColumnSet( - [ - ("group_id", UUID()), - ("project_id", UInt(32)), - ("message", String()), - ("user_id", UInt(64)), - ] -) class FakeEntity(Entity, ABC): @@ -53,19 +45,6 @@ def __init__(self) -> None: join_type=JoinType.INNER, equivalences=[ColumnEquivalence("project_id", "project_id")], ), - "assigned_group": JoinRelationship( - rhs_entity=EntityKey.GROUPASSIGNEE, - columns=[("group_id", "group_id")], - join_type=JoinType.INNER, - equivalences=[ColumnEquivalence("project_id", "project_id")], - ), - # This makes no sense but it is for the sake of the test - "assigned_user": JoinRelationship( - rhs_entity=EntityKey.GROUPASSIGNEE, - columns=[("user_id", "user_id")], - join_type=JoinType.INNER, - equivalences=[ColumnEquivalence("project_id", "project_id")], - ), # This makes even less sense but self referencing joins are possible "self_relationship": JoinRelationship( rhs_entity=EntityKey.EVENTS, @@ -93,38 +72,6 @@ def __init__(self) -> None: join_type=JoinType.INNER, equivalences=[ColumnEquivalence("project_id", "project_id")], ), - "assigned": JoinRelationship( - rhs_entity=EntityKey.GROUPASSIGNEE, - columns=[("user_id", "user_id")], - join_type=JoinType.INNER, - equivalences=[], - ), - }, - validators=None, - required_time_column=None, - subscription_processors=None, - subscription_validators=None, - ) - - -class GroupAssignee(FakeEntity): - def __init__(self) -> None: - super().__init__( - storages=[], - abstract_column_set=GROUPS_ASSIGNEE, - join_relationships={ - "events": JoinRelationship( - rhs_entity=EntityKey.EVENTS, - columns=[("group_id", "group_id")], - join_type=JoinType.INNER, - equivalences=[ColumnEquivalence("project_id", "project_id")], - ), - "user_assigned": JoinRelationship( - rhs_entity=EntityKey.EVENTS, - columns=[("user_id", "user_id")], - join_type=JoinType.INNER, - equivalences=[ColumnEquivalence("project_id", "project_id")], - ), }, validators=None, required_time_column=None, diff --git a/tests/query/joins/join_structures.py b/tests/query/joins/join_structures.py index 4a0e7755e1b..63574516ae5 100644 --- a/tests/query/joins/join_structures.py +++ b/tests/query/joins/join_structures.py @@ -18,7 +18,6 @@ from snuba.query.logical import Query as LogicalQuery from tests.query.joins.equivalence_schema import ( EVENTS_SCHEMA, - GROUPS_ASSIGNEE, GROUPS_SCHEMA, ) @@ -117,18 +116,6 @@ def clickhouse_groups_node( ) -def clickhouse_assignees_node( - selected_columns: Sequence[SelectedExpression], - condition: Optional[Expression] = None, -) -> IndividualNode[Table]: - return build_clickhouse_node( - "as", - Table("groupassignee_local", GROUPS_ASSIGNEE, storage_key=StorageKey("dontmatter")), - selected_columns, - condition, - ) - - def events_groups_join( left: IndividualNode[TNode], right: IndividualNode[TNode], diff --git a/tests/query/joins/test_equivalences.py b/tests/query/joins/test_equivalences.py index 70fd9369bbf..f066892ee9c 100644 --- a/tests/query/joins/test_equivalences.py +++ b/tests/query/joins/test_equivalences.py @@ -17,10 +17,8 @@ ) from tests.query.joins.equivalence_schema import ( EVENTS_SCHEMA, - GROUPS_ASSIGNEE, GROUPS_SCHEMA, Events, - GroupAssignee, Profiles, ) @@ -54,110 +52,6 @@ }, id="Two entities join", ), - pytest.param( - JoinClause( - JoinClause( - IndividualNode("ev", EntitySource(EntityKey.EVENTS, EVENTS_SCHEMA, None)), - IndividualNode("as", EntitySource(EntityKey.GROUPASSIGNEE, GROUPS_ASSIGNEE, None)), - [ - JoinCondition( - JoinConditionExpression("ev", "group_id"), - JoinConditionExpression("as", "group_id"), - ) - ], - JoinType.INNER, - None, - ), - IndividualNode("gr", EntitySource(EntityKey.PROFILES, GROUPS_SCHEMA, None)), - [ - JoinCondition( - JoinConditionExpression("ev", "group_id"), - JoinConditionExpression("gr", "id"), - ) - ], - JoinType.INNER, - None, - ), - { - QualifiedCol(EntityKey.EVENTS, "group_id"): { - QualifiedCol(EntityKey.PROFILES, "id"), - QualifiedCol(EntityKey.GROUPASSIGNEE, "group_id"), - }, - QualifiedCol(EntityKey.PROFILES, "id"): { - QualifiedCol(EntityKey.EVENTS, "group_id"), - QualifiedCol(EntityKey.GROUPASSIGNEE, "group_id"), - }, - QualifiedCol(EntityKey.GROUPASSIGNEE, "group_id"): { - QualifiedCol(EntityKey.EVENTS, "group_id"), - QualifiedCol(EntityKey.PROFILES, "id"), - }, - QualifiedCol(EntityKey.EVENTS, "project_id"): { - QualifiedCol(EntityKey.PROFILES, "project_id"), - QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"), - }, - QualifiedCol(EntityKey.PROFILES, "project_id"): { - QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"), - QualifiedCol(EntityKey.EVENTS, "project_id"), - }, - QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"): { - QualifiedCol(EntityKey.PROFILES, "project_id"), - QualifiedCol(EntityKey.EVENTS, "project_id"), - }, - }, - id="Join with three tables", - ), - pytest.param( - JoinClause( - JoinClause( - IndividualNode("ev", EntitySource(EntityKey.EVENTS, EVENTS_SCHEMA, None)), - IndividualNode("gr", EntitySource(EntityKey.PROFILES, GROUPS_SCHEMA, None)), - [ - JoinCondition( - JoinConditionExpression("ev", "group_id"), - JoinConditionExpression("gr", "id"), - ) - ], - JoinType.INNER, - None, - ), - IndividualNode("as", EntitySource(EntityKey.GROUPASSIGNEE, GROUPS_ASSIGNEE, None)), - [ - JoinCondition( - JoinConditionExpression("gr", "user_id"), - JoinConditionExpression("as", "user_id"), - ) - ], - JoinType.INNER, - None, - ), - { - QualifiedCol(EntityKey.EVENTS, "group_id"): { - QualifiedCol(EntityKey.PROFILES, "id"), - }, - QualifiedCol(EntityKey.PROFILES, "id"): { - QualifiedCol(EntityKey.EVENTS, "group_id"), - }, - QualifiedCol(EntityKey.PROFILES, "user_id"): { - QualifiedCol(EntityKey.GROUPASSIGNEE, "user_id"), - }, - QualifiedCol(EntityKey.GROUPASSIGNEE, "user_id"): { - QualifiedCol(EntityKey.PROFILES, "user_id"), - }, - QualifiedCol(EntityKey.EVENTS, "project_id"): { - QualifiedCol(EntityKey.PROFILES, "project_id"), - QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"), - }, - QualifiedCol(EntityKey.PROFILES, "project_id"): { - QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"), - QualifiedCol(EntityKey.EVENTS, "project_id"), - }, - QualifiedCol(EntityKey.GROUPASSIGNEE, "project_id"): { - QualifiedCol(EntityKey.PROFILES, "project_id"), - QualifiedCol(EntityKey.EVENTS, "project_id"), - }, - }, - id="Join with three tables", - ), ] @@ -165,7 +59,6 @@ def test_find_equivalences(join: JoinClause[EntitySource], graph: EquivalenceGraph) -> None: override_entity_map(EntityKey.EVENTS, Events()) override_entity_map(EntityKey.PROFILES, Profiles()) - override_entity_map(EntityKey.GROUPASSIGNEE, GroupAssignee()) assert get_equivalent_columns(join) == graph diff --git a/tests/query/joins/test_semi_join.py b/tests/query/joins/test_semi_join.py index cd9b50163a2..02c06b7a304 100644 --- a/tests/query/joins/test_semi_join.py +++ b/tests/query/joins/test_semi_join.py @@ -6,17 +6,13 @@ from snuba.query.composite import CompositeQuery from snuba.query.data_source.join import ( JoinClause, - JoinCondition, - JoinConditionExpression, JoinModifier, - JoinType, ) from snuba.query.data_source.simple import Table from snuba.query.expressions import Column from snuba.query.joins.semi_joins import SemiJoinOptimizer from snuba.query.query_settings import HTTPQuerySettings from tests.query.joins.join_structures import ( - clickhouse_assignees_node, clickhouse_events_node, clickhouse_groups_node, events_groups_join, @@ -113,94 +109,6 @@ {"gr": None}, id="Query with reference to columns on the right side. No semi join", ), - pytest.param( - CompositeQuery( - from_clause=JoinClause( - left_node=events_groups_join( - clickhouse_events_node( - [ - SelectedExpression( - "_snuba_group_id", - Column("_snuba_group_id", None, "group_id"), - ), - ] - ), - clickhouse_groups_node( - [ - SelectedExpression("_snuba_id", Column("_snuba_id", None, "id")), - SelectedExpression( - "_snuba_col1", Column("_snuba_col1", None, "something") - ), - ] - ), - ), - right_node=clickhouse_assignees_node( - [ - SelectedExpression( - "_snuba_group_id", - Column("_snuba_group_id", None, "group_id"), - ), - SelectedExpression("_snuba_col1", Column("_snuba_col1", None, "something")), - ] - ), - keys=[ - JoinCondition( - left=JoinConditionExpression("ev", "_snuba_group_id"), - right=JoinConditionExpression("as", "_snuba_group_id"), - ) - ], - join_type=JoinType.INNER, - ), - selected_columns=[ - SelectedExpression("group_id", Column("_snuba_col1", "gr", "_snuba_col1")) - ], - ), - {"gr": None, "as": JoinModifier.ANY}, - id="Multi table join, make only the right one a semi join.", - ), - pytest.param( - CompositeQuery( - from_clause=JoinClause( - left_node=events_groups_join( - clickhouse_events_node( - [ - SelectedExpression( - "_snuba_group_id", - Column("_snuba_group_id", None, "group_id"), - ), - ] - ), - clickhouse_groups_node( - [ - SelectedExpression("_snuba_id", Column("_snuba_id", None, "id")), - SelectedExpression( - "_snuba_col1", Column("_snuba_col1", None, "something") - ), - ] - ), - ), - right_node=clickhouse_assignees_node( - [ - SelectedExpression( - "_snuba_group_id", - Column("_snuba_group_id", None, "group_id"), - ), - SelectedExpression("_snuba_col1", Column("_snuba_col1", None, "something")), - ] - ), - keys=[ - JoinCondition( - left=JoinConditionExpression("ev", "_snuba_group_id"), - right=JoinConditionExpression("as", "_snuba_group_id"), - ) - ], - join_type=JoinType.INNER, - ), - selected_columns=[], - ), - {"gr": JoinModifier.ANY, "as": JoinModifier.ANY}, - id="Multi table join, make both joins semi join.", - ), ] diff --git a/tests/query/joins/test_subqueries.py b/tests/query/joins/test_subqueries.py index 2cd86d82552..387f89c0d94 100644 --- a/tests/query/joins/test_subqueries.py +++ b/tests/query/joins/test_subqueries.py @@ -25,10 +25,8 @@ from snuba.query.logical import Query as LogicalQuery from tests.query.joins.equivalence_schema import ( EVENTS_SCHEMA, - GROUPS_ASSIGNEE, GROUPS_SCHEMA, Events, - GroupAssignee, Profiles, ) from tests.query.joins.join_structures import ( @@ -239,72 +237,6 @@ ), id="Query with condition across entities", ), - pytest.param( - CompositeQuery( - from_clause=JoinClause( - left_node=BASIC_JOIN, - right_node=IndividualNode( - "as", Entity(EntityKey.GROUPASSIGNEE, GROUPS_ASSIGNEE, None) - ), - keys=[ - JoinCondition( - left=JoinConditionExpression("ev", "group_id"), - right=JoinConditionExpression("as", "group_id"), - ) - ], - join_type=JoinType.INNER, - ), - selected_columns=[ - SelectedExpression("group_id", Column("_snuba_group_id", "gr", "id")), - ], - ), - CompositeQuery( - from_clause=JoinClause( - left_node=events_groups_join( - events_node( - [ - SelectedExpression( - "_snuba_group_id", - Column("_snuba_group_id", None, "group_id"), - ), - ], - ), - groups_node( - [ - SelectedExpression( - "_snuba_group_id", - Column("_snuba_group_id", None, "id"), - ), - SelectedExpression("_snuba_id", Column("_snuba_id", None, "id")), - ], - ), - ), - right_node=IndividualNode( - alias="as", - data_source=LogicalQuery( - from_clause=Entity(EntityKey.GROUPASSIGNEE, GROUPS_ASSIGNEE), - selected_columns=[ - SelectedExpression( - "_snuba_group_id", - Column("_snuba_group_id", None, "group_id"), - ), - ], - ), - ), - keys=[ - JoinCondition( - left=JoinConditionExpression("ev", "_snuba_group_id"), - right=JoinConditionExpression("as", "_snuba_group_id"), - ) - ], - join_type=JoinType.INNER, - ), - selected_columns=[ - SelectedExpression("group_id", Column("_snuba_group_id", "gr", "_snuba_group_id")), - ], - ), - id="Multi entity join", - ), pytest.param( CompositeQuery( from_clause=BASIC_JOIN, @@ -495,7 +427,6 @@ def test_subquery_generator( ) -> None: override_entity_map(EntityKey.EVENTS, Events()) override_entity_map(EntityKey.PROFILES, Profiles()) - override_entity_map(EntityKey.GROUPASSIGNEE, GroupAssignee()) generate_subqueries(original_query) diff --git a/tests/query/snql/test_invalid_queries.py b/tests/query/snql/test_invalid_queries.py index b7365ead494..087dbdf062e 100644 --- a/tests/query/snql/test_invalid_queries.py +++ b/tests/query/snql/test_invalid_queries.py @@ -98,7 +98,6 @@ def test_failures(query_body: str, message: str) -> None: # TODO: Potentially remove this once entities have actual join relationships mapping = { "contains": (EntityKey.TRANSACTIONS, "event_id"), - "assigned": (EntityKey.GROUPASSIGNEE, "group_id"), "bookmark": (EntityKey.PROFILES, "profile_id"), } diff --git a/tests/query/snql/test_query.py b/tests/query/snql/test_query.py index 19f30e76e8e..c3f6182f6d2 100644 --- a/tests/query/snql/test_query.py +++ b/tests/query/snql/test_query.py @@ -937,245 +937,6 @@ def build_cond(tn: str) -> str: ), id="Basic join match with sample", ), - pytest.param( - f"""MATCH - (e: events) -[contains]-> (t: transactions), - (e: events) -[assigned]-> (ga: groupassignee) - SELECT 4-5, ga.offset - WHERE {build_cond("e")} AND {build_cond("t")}""", - CompositeQuery( - from_clause=JoinClause( - left_node=JoinClause( - left_node=IndividualNode( - "e", - QueryEntity( - EntityKey.EVENTS, - get_entity(EntityKey.EVENTS).get_data_model(), - ), - ), - right_node=IndividualNode( - "ga", - QueryEntity( - EntityKey.GROUPASSIGNEE, - get_entity(EntityKey.GROUPASSIGNEE).get_data_model(), - ), - ), - keys=[ - JoinCondition( - JoinConditionExpression("e", "event_id"), - JoinConditionExpression("ga", "group_id"), - ) - ], - join_type=JoinType.INNER, - ), - right_node=IndividualNode( - "t", - QueryEntity( - EntityKey.TRANSACTIONS, - get_entity(EntityKey.TRANSACTIONS).get_data_model(), - ), - ), - keys=[ - JoinCondition( - JoinConditionExpression("e", "event_id"), - JoinConditionExpression("t", "event_id"), - ) - ], - join_type=JoinType.INNER, - ), - selected_columns=[ - SelectedExpression( - "4-5", - FunctionCall("_snuba_4-5", "minus", (Literal(None, 4), Literal(None, 5))), - ), - SelectedExpression("ga.offset", Column("_snuba_ga.offset", "ga", "offset")), - ], - condition=and_cond( - and_cond( - f.equals(column("project_id", "e", "_snuba_e.project_id"), literal(1)), - f.greaterOrEquals( - column("timestamp", "e", "_snuba_e.timestamp"), - literal(datetime.datetime(2021, 1, 1, 0, 0)), - ), - ), - and_cond( - and_cond( - f.less( - column("timestamp", "e", "_snuba_e.timestamp"), - literal(datetime.datetime(2021, 1, 2, 0, 0)), - ), - f.equals(column("project_id", "t", "_snuba_t.project_id"), literal(1)), - ), - and_cond( - f.greaterOrEquals( - column("finish_ts", "t", "_snuba_t.finish_ts"), - literal(datetime.datetime(2021, 1, 1, 0, 0)), - ), - f.less( - column("finish_ts", "t", "_snuba_t.finish_ts"), - literal(datetime.datetime(2021, 1, 2, 0, 0)), - ), - ), - ), - ), - groupby=None, - limit=1000, - offset=0, - ), - id="Multi join match", - ), - pytest.param( - f"""MATCH - (e: events) -[contains]-> (t: transactions), - (e: events) -[assigned]-> (ga: groupassignee), - (e: events) -[bookmark]-> (gm: profiles), - (e: events) -[activity]-> (se: metrics_sets) - SELECT 4-5, e.event_id, t.event_id, ga.offset, gm.offset, se.metric_id - WHERE {build_cond("e")} AND {build_cond("t")} - AND se.org_id = 1 AND se.project_id = 1 - AND se.timestamp >= toDateTime('2021-01-01') AND se.timestamp < toDateTime('2021-01-02')""", - CompositeQuery( - from_clause=JoinClause( - left_node=JoinClause( - left_node=JoinClause( - left_node=JoinClause( - left_node=IndividualNode( - "e", - QueryEntity( - EntityKey.EVENTS, - get_entity(EntityKey.EVENTS).get_data_model(), - ), - ), - right_node=IndividualNode( - "se", - QueryEntity( - EntityKey.METRICS_SETS, - get_entity(EntityKey.METRICS_SETS).get_data_model(), - ), - ), - keys=[ - JoinCondition( - JoinConditionExpression("e", "event_id"), - JoinConditionExpression("se", "org_id"), - ) - ], - join_type=JoinType.INNER, - ), - right_node=IndividualNode( - "gm", - QueryEntity( - EntityKey.PROFILES, - get_entity(EntityKey.PROFILES).get_data_model(), - ), - ), - keys=[ - JoinCondition( - JoinConditionExpression("e", "event_id"), - JoinConditionExpression("gm", "profile_id"), - ) - ], - join_type=JoinType.INNER, - ), - right_node=IndividualNode( - "ga", - QueryEntity( - EntityKey.GROUPASSIGNEE, - get_entity(EntityKey.GROUPASSIGNEE).get_data_model(), - ), - ), - keys=[ - JoinCondition( - JoinConditionExpression("e", "event_id"), - JoinConditionExpression("ga", "group_id"), - ) - ], - join_type=JoinType.INNER, - ), - right_node=IndividualNode( - "t", - QueryEntity( - EntityKey.TRANSACTIONS, - get_entity(EntityKey.TRANSACTIONS).get_data_model(), - ), - ), - keys=[ - JoinCondition( - JoinConditionExpression("e", "event_id"), - JoinConditionExpression("t", "event_id"), - ) - ], - join_type=JoinType.INNER, - ), - selected_columns=[ - SelectedExpression( - "4-5", - FunctionCall("_snuba_4-5", "minus", (Literal(None, 4), Literal(None, 5))), - ), - SelectedExpression("e.event_id", Column("_snuba_e.event_id", "e", "event_id")), - SelectedExpression("t.event_id", Column("_snuba_t.event_id", "t", "event_id")), - SelectedExpression("ga.offset", Column("_snuba_ga.offset", "ga", "offset")), - SelectedExpression("gm.offset", Column("_snuba_gm.offset", "gm", "offset")), - SelectedExpression( - "se.metric_id", Column("_snuba_se.metric_id", "se", "metric_id") - ), - ], - condition=and_cond( - and_cond( - f.equals(column("project_id", "e", "_snuba_e.project_id"), literal(1)), - f.greaterOrEquals( - column("timestamp", "e", "_snuba_e.timestamp"), - literal(datetime.datetime(2021, 1, 1, 0, 0)), - ), - ), - and_cond( - and_cond( - and_cond( - f.less( - column("timestamp", "e", "_snuba_e.timestamp"), - literal(datetime.datetime(2021, 1, 2, 0, 0)), - ), - f.equals( - column("project_id", "t", "_snuba_t.project_id"), - literal(1), - ), - ), - and_cond( - f.greaterOrEquals( - column("finish_ts", "t", "_snuba_t.finish_ts"), - literal(datetime.datetime(2021, 1, 1, 0, 0)), - ), - f.less( - column("finish_ts", "t", "_snuba_t.finish_ts"), - literal(datetime.datetime(2021, 1, 2, 0, 0)), - ), - ), - ), - and_cond( - and_cond( - f.equals(column("org_id", "se", "_snuba_se.org_id"), literal(1)), - f.equals( - column("project_id", "se", "_snuba_se.project_id"), - literal(1), - ), - ), - and_cond( - f.greaterOrEquals( - column("timestamp", "se", "_snuba_se.timestamp"), - literal(datetime.datetime(2021, 1, 1, 0, 0)), - ), - f.less( - column("timestamp", "se", "_snuba_se.timestamp"), - literal(datetime.datetime(2021, 1, 2, 0, 0)), - ), - ), - ), - ), - ), - limit=1000, - offset=0, - ), - id="Multi multi join match", - ), pytest.param( "MATCH { MATCH (events) SELECT count() AS count BY title WHERE %s } SELECT max(count) AS max_count" % added_condition, @@ -1812,7 +1573,6 @@ def test_format_expressions(query_body: str, expected_query: LogicalQuery) -> No # TODO: Potentially remove this once entities have actual join relationships mapping = { "contains": (EntityKey.TRANSACTIONS, "event_id"), - "assigned": (EntityKey.GROUPASSIGNEE, "group_id"), "bookmark": (EntityKey.PROFILES, "profile_id"), "activity": (EntityKey.METRICS_SETS, "org_id"), } diff --git a/tests/test_snql_api.py b/tests/test_snql_api.py index ea51c15a500..a02c3805a32 100644 --- a/tests/test_snql_api.py +++ b/tests/test_snql_api.py @@ -624,51 +624,6 @@ def test_alias_limitby_aggregator(self) -> None: data = json.loads(response.data) assert "LIMIT 1 BY _snuba_count" in data["sql"] - def test_multi_table_join(self) -> None: - response = self.post( - "/events/snql", - data=json.dumps( - { - "query": f""" - MATCH (e: events) -[grouped]-> (g: profiles), - (e: events) -[assigned]-> (a: groupassignee) - SELECT e.message, e.tags[b], a.user_id, g.last_seen - WHERE e.project_id = {self.project_id} - AND g.project_id = {self.project_id} - AND e.timestamp >= toDateTime('2021-06-04T00:00:00') - AND e.timestamp < toDateTime('2021-07-12T00:00:00') - """, - "turbo": False, - "consistent": False, - "debug": True, - "tenant_ids": {"referrer": "r", "organization_id": 123}, - } - ), - ) - - assert response.status_code == 200 - - def test_complex_table_join(self) -> None: - response = self.post( - "/events/snql", - data=json.dumps( - { - "query": f""" - MATCH (e: events) -[grouped]-> (g: profiles) - SELECT g.id, toUInt64(plus(multiply(log(count(e.group_id)), 600), multiply(toUInt64(toUInt64(max(e.timestamp))), 1000))) AS score BY g.id - WHERE e.project_id IN array({self.project_id}) AND e.timestamp >= toDateTime('2021-07-04T01:09:08.188427') AND e.timestamp < toDateTime('2021-08-06T01:10:09.411889') AND g.status IN array(0) - ORDER BY toUInt64(plus(multiply(log(count(e.group_id)), 600), multiply(toUInt64(toUInt64(max(e.timestamp))), 1000))) DESC LIMIT 101 - """, - "turbo": False, - "consistent": False, - "debug": True, - "tenant_ids": {"referrer": "r", "organization_id": 123}, - } - ), - ) - - assert response.status_code == 200 - def test_nullable_query(self) -> None: response = self.post( "/discover/snql", From adad7be269e6bc1d9bb7f52ee235b786857e810f Mon Sep 17 00:00:00 2001 From: "getsantry[bot]" <66042841+getsantry[bot]@users.noreply.github.com> Date: Thu, 22 Jan 2026 23:03:28 +0000 Subject: [PATCH 09/10] [getsentry/action-github-commit] Auto commit --- tests/query/joins/equivalence_schema.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/query/joins/equivalence_schema.py b/tests/query/joins/equivalence_schema.py index 2b89bfa3409..567aabb5c2c 100644 --- a/tests/query/joins/equivalence_schema.py +++ b/tests/query/joins/equivalence_schema.py @@ -27,7 +27,6 @@ ) - class FakeEntity(Entity, ABC): def get_query_processors(self) -> Sequence[LogicalQueryProcessor]: return [] From c8fdcfe17e3defd402dd1b6b38e2ead7fbe55624 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 22 Jan 2026 15:07:07 -0800 Subject: [PATCH 10/10] fix: Address mypy type errors in test files - Wrap Column in list for array_join and LimitBy arguments - Add type parameter to ProcessableQuery - Add type: ignore comments for test entity fixtures Co-Authored-By: Claude --- tests/query/formatters/test_query.py | 6 +++--- tests/query/joins/test_equivalence_adder.py | 4 ++-- tests/query/joins/test_equivalences.py | 4 ++-- tests/query/joins/test_subqueries.py | 4 ++-- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/query/formatters/test_query.py b/tests/query/formatters/test_query.py index d074c16c7f4..0cff7fb8edc 100644 --- a/tests/query/formatters/test_query.py +++ b/tests/query/formatters/test_query.py @@ -62,12 +62,12 @@ SelectedExpression("c1", Column("_snuba_c1", "t", "c")), SelectedExpression("f1", FunctionCall("_snuba_f1", "f", (Column(None, "t", "c2"),))), ], - array_join=Column(None, None, "col"), + array_join=[Column(None, None, "col")], condition=binary_condition("equals", Column(None, None, "c4"), Literal(None, "asd")), groupby=[Column(None, "t", "c4")], having=binary_condition("equals", Column(None, None, "c6"), Literal(None, "asd2")), order_by=[OrderBy(OrderByDirection.ASC, Column(None, "t", "c"))], - limitby=LimitBy(100, Column(None, None, "c8")), + limitby=LimitBy(100, [Column(None, None, "c8")]), limit=150, ) @@ -249,7 +249,7 @@ @pytest.mark.parametrize("query, formatted", TEST_JOIN) def test_query_formatter( - query: Union[ProcessableQuery, CompositeQuery[Entity]], + query: Union[ProcessableQuery[Entity], CompositeQuery[Entity]], formatted: TExpression, ) -> None: formatted_query = format_query(query) # type: ignore diff --git a/tests/query/joins/test_equivalence_adder.py b/tests/query/joins/test_equivalence_adder.py index 2b2be3e801a..00c02fdaaa4 100644 --- a/tests/query/joins/test_equivalence_adder.py +++ b/tests/query/joins/test_equivalence_adder.py @@ -251,8 +251,8 @@ def test_add_equivalent_condition( join_clause: JoinClause[EntitySource], expected_expr: Expression, ) -> None: - override_entity_map(EntityKey.EVENTS, Events()) - override_entity_map(EntityKey.PROFILES, Profiles()) + override_entity_map(EntityKey.EVENTS, Events()) # type: ignore[arg-type] + override_entity_map(EntityKey.PROFILES, Profiles()) # type: ignore[arg-type] query = CompositeQuery( from_clause=join_clause, diff --git a/tests/query/joins/test_equivalences.py b/tests/query/joins/test_equivalences.py index f066892ee9c..22e3a3ba9c5 100644 --- a/tests/query/joins/test_equivalences.py +++ b/tests/query/joins/test_equivalences.py @@ -57,8 +57,8 @@ @pytest.mark.parametrize("join, graph", TEST_CASES) def test_find_equivalences(join: JoinClause[EntitySource], graph: EquivalenceGraph) -> None: - override_entity_map(EntityKey.EVENTS, Events()) - override_entity_map(EntityKey.PROFILES, Profiles()) + override_entity_map(EntityKey.EVENTS, Events()) # type: ignore[arg-type] + override_entity_map(EntityKey.PROFILES, Profiles()) # type: ignore[arg-type] assert get_equivalent_columns(join) == graph diff --git a/tests/query/joins/test_subqueries.py b/tests/query/joins/test_subqueries.py index 387f89c0d94..2364b24f042 100644 --- a/tests/query/joins/test_subqueries.py +++ b/tests/query/joins/test_subqueries.py @@ -425,8 +425,8 @@ def test_subquery_generator( original_query: CompositeQuery[Entity], processed_query: CompositeQuery[Entity], ) -> None: - override_entity_map(EntityKey.EVENTS, Events()) - override_entity_map(EntityKey.PROFILES, Profiles()) + override_entity_map(EntityKey.EVENTS, Events()) # type: ignore[arg-type] + override_entity_map(EntityKey.PROFILES, Profiles()) # type: ignore[arg-type] generate_subqueries(original_query)