diff --git a/uv.lock b/uv.lock index 7d7341893..045953eed 100644 --- a/uv.lock +++ b/uv.lock @@ -2545,6 +2545,7 @@ test = [ { name = "pytest-httpx" }, { name = "pytest-mock", specifier = ">=3.14.0" }, { name = "pytest-xdist" }, + { name = "responses", specifier = ">=0.25.3" }, ] type-check = [ { name = "celery-types", specifier = ">=0.22.0" }, diff --git a/validator/app/src/compute_horde_validator/settings.py b/validator/app/src/compute_horde_validator/settings.py index 34252183e..37d4a598c 100644 --- a/validator/app/src/compute_horde_validator/settings.py +++ b/validator/app/src/compute_horde_validator/settings.py @@ -3,7 +3,6 @@ """ import inspect -import logging import pathlib from datetime import timedelta from functools import wraps @@ -13,6 +12,8 @@ from celery.schedules import crontab from compute_horde import base # noqa +from compute_horde_validator.validator.sentry import init_sentry + root = environ.Path(__file__) - 2 env = environ.Env(DEBUG=(bool, False)) @@ -641,22 +642,6 @@ def BITTENSOR_WALLET() -> bittensor.wallet: # Sentry if SENTRY_DSN := env("SENTRY_DSN", default=""): - import sentry_sdk - from sentry_sdk.integrations.celery import CeleryIntegration - from sentry_sdk.integrations.django import DjangoIntegration - from sentry_sdk.integrations.logging import LoggingIntegration - from sentry_sdk.integrations.redis import RedisIntegration - - sentry_sdk.init( - dsn=SENTRY_DSN, - environment=ENV, - integrations=[ - DjangoIntegration(), - CeleryIntegration(), - RedisIntegration(), - LoggingIntegration( - level=logging.INFO, # Capture info and above as breadcrumbs - event_level=logging.ERROR, # Send error events from log messages - ), - ], - ) + init_sentry(SENTRY_DSN, ENV) + +FETCH_SENTRY_DSN_RETRY_INTERVAL = env.float("FETCH_SENTRY_DSN_RETRY_INTERVAL", default=30 * 60) diff --git a/validator/app/src/compute_horde_validator/validator/apps.py b/validator/app/src/compute_horde_validator/validator/apps.py index fdb569067..cb790b770 100644 --- a/validator/app/src/compute_horde_validator/validator/apps.py +++ b/validator/app/src/compute_horde_validator/validator/apps.py @@ -1,9 +1,14 @@ import logging +import threading +import time from django.apps import AppConfig from django.conf import settings from django.db.models.signals import post_migrate +from compute_horde_validator.validator.sentry import init_sentry +from compute_horde_validator.validator.stats_collector_client import StatsCollectorClient + logger = logging.getLogger(__name__) @@ -27,8 +32,44 @@ def maybe_create_default_admin(sender, **kwargs): ) +def maybe_init_sentry_from_stats_collector(): + if settings.SENTRY_DSN: + return + + def target(): + # Fetch DSN from stats collector + logger.info("Fetching Sentry DSN from stats collector") + dsn = None + while True: + try: + client = StatsCollectorClient() + dsn = client.get_sentry_dsn() + except Exception as e: + logger.error("Error while fetching Sentry DSN from stats collector: %s", e) + + if dsn: + logger.info("Initializing Sentry with custom DSN") + init_sentry(dsn, settings.ENV) + return + + if settings.FETCH_SENTRY_DSN_RETRY_INTERVAL <= 0: + return + + logger.info( + "Fetching Sentry DSN: retrying in %.1fs", settings.FETCH_SENTRY_DSN_RETRY_INTERVAL + ) + # No DSN or connection error, sleep and retry + time.sleep(settings.FETCH_SENTRY_DSN_RETRY_INTERVAL) + + thread = threading.Thread(target=target, daemon=True) + thread.start() + # Not waiting for thread to finish to not block the app start. + # If an exception is raised in the thread, it will be ignored. + + class ValidatorConfig(AppConfig): name = "compute_horde_validator.validator" def ready(self): post_migrate.connect(maybe_create_default_admin, sender=self) + maybe_init_sentry_from_stats_collector() diff --git a/validator/app/src/compute_horde_validator/validator/sentry.py b/validator/app/src/compute_horde_validator/validator/sentry.py new file mode 100644 index 000000000..bf0921c31 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/sentry.py @@ -0,0 +1,23 @@ +import logging + + +def init_sentry(dsn: str, environment: str) -> None: + import sentry_sdk + from sentry_sdk.integrations.celery import CeleryIntegration + from sentry_sdk.integrations.django import DjangoIntegration + from sentry_sdk.integrations.logging import LoggingIntegration + from sentry_sdk.integrations.redis import RedisIntegration + + sentry_sdk.init( + dsn=dsn, + environment=environment, + integrations=[ + DjangoIntegration(), + CeleryIntegration(), + RedisIntegration(), + LoggingIntegration( + level=logging.INFO, # Capture info and above as breadcrumbs + event_level=logging.ERROR, # Send error events from log messages + ), + ], + ) diff --git a/validator/app/src/compute_horde_validator/validator/stats_collector_client.py b/validator/app/src/compute_horde_validator/validator/stats_collector_client.py new file mode 100644 index 000000000..59fe18693 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/stats_collector_client.py @@ -0,0 +1,95 @@ +import json +import logging +import time +from typing import TYPE_CHECKING, Any + +import pydantic +import requests +from django.conf import settings + +if TYPE_CHECKING: + from compute_horde_validator.validator.models import SystemEvent + +logger = logging.getLogger(__name__) + + +class ValidatorSentryDSNResponse(pydantic.BaseModel): + sentry_dsn: str | None + + +def get_keypair(): + return settings.BITTENSOR_WALLET().get_hotkey() + + +class StatsCollectorError(RuntimeError): + @property + def response(self) -> requests.Response | None: + if isinstance(self.__cause__, requests.RequestException): + return self.__cause__.response + return None + + +class StatsCollectorClient: + def __init__(self, url: str = settings.STATS_COLLECTOR_URL): + if not url.endswith("/"): + url += "/" + self.url = url + self.keypair = get_keypair() + self.timeout = 5 # seconds + + def send_events(self, events: list["SystemEvent"]) -> None: + hotkey = self.keypair.ss58_address + data = [event.to_dict() for event in events] + path = f"validator/{hotkey}/system_events" + + self.make_request("POST", path, data=data) + + def get_sentry_dsn(self) -> str | None: + hotkey = self.keypair.ss58_address + + response = self.make_request("GET", f"validator/{hotkey}/sentry_dsn") + + try: + model = ValidatorSentryDSNResponse.model_validate_json(response) + except pydantic.ValidationError as e: + raise StatsCollectorError("Malformed response") from e + + return model.sentry_dsn + + def make_request( + self, + method: str, + path: str, + *, + headers: dict[str, str] | None = None, + data: Any = None, + **kwargs, + ) -> str: + url = self.url + path + logger.info("%s %s", method, url) + try: + response = requests.request( + method, + self.url + path, + json=data, + headers={**self.get_signature_headers(), **(headers or {})}, + timeout=self.timeout, + **kwargs, + ) + response.raise_for_status() + return response.text + except requests.RequestException as e: + raise StatsCollectorError(repr(e)) from e + + def get_signature_headers(self): + hotkey = self.keypair.ss58_address + signing_timestamp = int(time.time()) + to_sign = json.dumps( + {"signing_timestamp": signing_timestamp, "validator_ss58_address": hotkey}, + sort_keys=True, + ) + signature = f"0x{self.keypair.sign(to_sign).hex()}" + return { + "Validator-Signature": signature, + "Validator-Signing-Timestamp": str(signing_timestamp), + } diff --git a/validator/app/src/compute_horde_validator/validator/tasks.py b/validator/app/src/compute_horde_validator/validator/tasks.py index 2fb730ed8..80507d35b 100644 --- a/validator/app/src/compute_horde_validator/validator/tasks.py +++ b/validator/app/src/compute_horde_validator/validator/tasks.py @@ -1,5 +1,4 @@ import contextlib -import json import random import time import traceback @@ -12,7 +11,6 @@ import bittensor import celery.exceptions import numpy as np -import requests from asgiref.sync import async_to_sync from bittensor.utils.weight_utils import process_weights_for_netuid from celery import shared_task @@ -69,6 +67,7 @@ from . import eviction from .models import AdminJobRequest from .scoring import score_batches +from .stats_collector_client import StatsCollectorClient, StatsCollectorError logger = get_task_logger(__name__) @@ -1102,33 +1101,19 @@ def send_events_to_facilitator(): logger.warning("STATS_COLLECTOR_URL is not set, not sending system events") return - keypair = get_keypair() - hotkey = keypair.ss58_address - signing_timestamp = int(time.time()) - to_sign = json.dumps( - {"signing_timestamp": signing_timestamp, "validator_ss58_address": hotkey}, - sort_keys=True, - ) - signature = f"0x{keypair.sign(to_sign).hex()}" events = list(events_qs) - data = [event.to_dict() for event in events] - url = settings.STATS_COLLECTOR_URL + f"validator/{hotkey}/system_events" - response = requests.post( - url, - json=data, - headers={ - "Validator-Signature": signature, - "Validator-Signing-Timestamp": str(signing_timestamp), - }, - ) - - if response.status_code == 201: - logger.info(f"Sent {len(data)} system events to facilitator") + client = StatsCollectorClient() + try: + client.send_events(events) + logger.info(f"Sent {len(events)} system events to facilitator") SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).filter( id__in=[event.id for event in events] ).update(sent=True) - else: - logger.error(f"Failed to send system events to facilitator: {response}") + except StatsCollectorError as e: + msg = "Failed to send system events to facilitator" + if e.response is not None: + msg = f"{msg}: {e.response}" + logger.error(msg) @app.task diff --git a/validator/app/src/compute_horde_validator/validator/tests/settings.py b/validator/app/src/compute_horde_validator/validator/tests/settings.py index b711cd4c2..c792cf589 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/settings.py +++ b/validator/app/src/compute_horde_validator/validator/tests/settings.py @@ -49,3 +49,5 @@ def BITTENSOR_WALLET() -> bittensor.wallet: # type: ignore TRUSTED_MINER_PORT = 1234 CACHES = {"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}} + +SENTRY_DSN = "fake_sentry_dsn" diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_stats_collector_client.py b/validator/app/src/compute_horde_validator/validator/tests/test_stats_collector_client.py new file mode 100644 index 000000000..4b6c3fb95 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/tests/test_stats_collector_client.py @@ -0,0 +1,127 @@ +import pytest +import responses +from django.conf import settings + +from compute_horde_validator.validator.models import SystemEvent +from compute_horde_validator.validator.stats_collector_client import ( + StatsCollectorClient, + StatsCollectorError, +) +from compute_horde_validator.validator.tests.helpers import get_keypair + +STATS_COLLECTOR_URL = "http://localhost:1234/" + + +def create_system_events() -> list[SystemEvent]: + events = SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS) + return [ + events.create( + type=SystemEvent.EventType.WEIGHT_SETTING_SUCCESS, + subtype=SystemEvent.EventSubType.SUCCESS, + data={}, + sent=False, + ), + events.create( + type=SystemEvent.EventType.WEIGHT_SETTING_FAILURE, + subtype=SystemEvent.EventSubType.GENERIC_ERROR, + data={}, + sent=False, + ), + events.create( + type=SystemEvent.EventType.WEIGHT_SETTING_FAILURE, + subtype=SystemEvent.EventSubType.GENERIC_ERROR, + data={}, + sent=True, + ), + ] + + +@pytest.mark.django_db(databases=["default", "default_alias"]) +@responses.activate +def test_send_events_ok(): + hotkey = get_keypair().ss58_address + responses.add( + responses.POST, f"{STATS_COLLECTOR_URL}validator/{hotkey}/system_events", status=201 + ) + client = StatsCollectorClient(STATS_COLLECTOR_URL) + events = create_system_events() + client.send_events(events) + + +@pytest.mark.django_db(databases=["default", "default_alias"]) +@responses.activate +def test_send_events_unauthenticated(): + hotkey = get_keypair().ss58_address + responses.add( + responses.POST, f"{STATS_COLLECTOR_URL}validator/{hotkey}/system_events", status=401 + ) + client = StatsCollectorClient(STATS_COLLECTOR_URL) + events = create_system_events() + with pytest.raises(StatsCollectorError): + client.send_events(events) + + +@pytest.mark.django_db(databases=["default", "default_alias"]) +@responses.activate +def test_send_events_bad_request(): + hotkey = get_keypair().ss58_address + responses.add( + responses.POST, f"{STATS_COLLECTOR_URL}validator/{hotkey}/system_events", status=400 + ) + client = StatsCollectorClient(STATS_COLLECTOR_URL) + events = create_system_events() + with pytest.raises(StatsCollectorError): + client.send_events(events) + + +@pytest.mark.django_db(databases=["default", "default_alias"]) +@responses.activate +def test_get_sentry_dsn_ok(): + hotkey = get_keypair().ss58_address + responses.add( + responses.GET, + f"{STATS_COLLECTOR_URL}validator/{hotkey}/sentry_dsn", + json={"sentry_dsn": "https://public@sentry.example.com/1"}, + ) + client = StatsCollectorClient(STATS_COLLECTOR_URL) + dsn = client.get_sentry_dsn() + assert dsn == "https://public@sentry.example.com/1" + + +@pytest.mark.django_db(databases=["default", "default_alias"]) +@responses.activate +def test_get_sentry_dsn_unauthenticated(): + hotkey = get_keypair().ss58_address + responses.add(responses.GET, f"{STATS_COLLECTOR_URL}validator/{hotkey}/sentry_dsn", status=401) + client = StatsCollectorClient(STATS_COLLECTOR_URL) + with pytest.raises(StatsCollectorError): + client.get_sentry_dsn() + + +@pytest.mark.django_db(databases=["default", "default_alias"]) +@responses.activate +def test_get_sentry_dsn_not_found(): + hotkey = get_keypair().ss58_address + responses.add( + responses.GET, + f"{STATS_COLLECTOR_URL}validator/{hotkey}/sentry_dsn", + json={"sentry_dsn": None}, + ) + client = StatsCollectorClient(STATS_COLLECTOR_URL) + dsn = client.get_sentry_dsn() + assert dsn is None + + +@pytest.mark.django_db(databases=["default", "default_alias"]) +@responses.activate +def test_get_sentry_dsn_invalid_response(): + hotkey = get_keypair().ss58_address + responses.add( + responses.GET, + f"{STATS_COLLECTOR_URL}validator/{hotkey}/sentry_dsn", + json={"foo": "bar"}, + status=200, + ) + client = StatsCollectorClient(STATS_COLLECTOR_URL) + with pytest.raises(StatsCollectorError): + client.get_sentry_dsn() diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_tasks.py b/validator/app/src/compute_horde_validator/validator/tests/test_tasks.py index 2a8fd42e0..445e9eb37 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_tasks.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_tasks.py @@ -15,6 +15,10 @@ SyntheticJobBatch, SystemEvent, ) +from compute_horde_validator.validator.stats_collector_client import ( + StatsCollectorClient, + StatsCollectorError, +) from compute_horde_validator.validator.tasks import ( ScheduleError, calculate_job_start_block, @@ -35,6 +39,10 @@ ) +def raise_error(e: Exception | type[Exception]): + raise e + + @patch("compute_horde_validator.validator.tasks.get_miner_axon_info", mock_get_miner_axon_info) @patch("compute_horde_validator.validator.tasks.MinerClient", MockMinerClient) @pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) @@ -114,9 +122,10 @@ def get_response(status): return response -@patch( - "compute_horde_validator.validator.tasks.requests.post", - lambda url, json, headers: get_response(201), +@patch.object( + StatsCollectorClient, + "send_events", + lambda self, events: None, ) @pytest.mark.django_db(databases=["default", "default_alias"]) def test_send_events_to_facilitator__success(): @@ -125,9 +134,10 @@ def test_send_events_to_facilitator__success(): assert SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).filter(sent=True).count() == 3 -@patch( - "compute_horde_validator.validator.tasks.requests.post", - lambda url, json, headers: get_response(400), +@patch.object( + StatsCollectorClient, + "send_events", + lambda self, events: raise_error(StatsCollectorError), ) @pytest.mark.django_db(databases=["default", "default_alias"]) def test_send_events_to_facilitator__failure(): diff --git a/validator/pyproject.toml b/validator/pyproject.toml index a4eba606c..d88a9b480 100644 --- a/validator/pyproject.toml +++ b/validator/pyproject.toml @@ -56,6 +56,7 @@ test = [ "freezegun", "pytest-mock>=3.14.0", "moto[s3]>=5.0.13", + "responses>=0.25.3", ] format = [ "ruff", diff --git a/validator/uv.lock b/validator/uv.lock index ca88c44d1..78529f8fb 100644 --- a/validator/uv.lock +++ b/validator/uv.lock @@ -2603,6 +2603,7 @@ test = [ { name = "pytest-httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "pytest-mock", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, { name = "pytest-xdist", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, + { name = "responses", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] type-check = [ { name = "celery-types", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, @@ -2663,6 +2664,7 @@ test = [ { name = "pytest-httpx" }, { name = "pytest-mock", specifier = ">=3.14.0" }, { name = "pytest-xdist" }, + { name = "responses", specifier = ">=0.25.3" }, ] type-check = [ { name = "celery-types", specifier = ">=0.22.0" },