Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 5 additions & 20 deletions validator/app/src/compute_horde_validator/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

import inspect
import logging
import pathlib
from datetime import timedelta
from functools import wraps
Expand All @@ -13,6 +12,8 @@
from celery.schedules import crontab
from compute_horde import base # noqa

from compute_horde_validator.validator.sentry import init_sentry

root = environ.Path(__file__) - 2

env = environ.Env(DEBUG=(bool, False))
Expand Down Expand Up @@ -641,22 +642,6 @@ def BITTENSOR_WALLET() -> bittensor.wallet:

# Sentry
if SENTRY_DSN := env("SENTRY_DSN", default=""):
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.integrations.redis import RedisIntegration

sentry_sdk.init(
dsn=SENTRY_DSN,
environment=ENV,
integrations=[
DjangoIntegration(),
CeleryIntegration(),
RedisIntegration(),
LoggingIntegration(
level=logging.INFO, # Capture info and above as breadcrumbs
event_level=logging.ERROR, # Send error events from log messages
),
],
)
init_sentry(SENTRY_DSN, ENV)

FETCH_SENTRY_DSN_RETRY_INTERVAL = env.float("FETCH_SENTRY_DSN_RETRY_INTERVAL", default=30 * 60)
41 changes: 41 additions & 0 deletions validator/app/src/compute_horde_validator/validator/apps.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import logging
import threading
import time

from django.apps import AppConfig
from django.conf import settings
from django.db.models.signals import post_migrate

from compute_horde_validator.validator.sentry import init_sentry
from compute_horde_validator.validator.stats_collector_client import StatsCollectorClient

logger = logging.getLogger(__name__)


Expand All @@ -27,8 +32,44 @@ def maybe_create_default_admin(sender, **kwargs):
)


def maybe_init_sentry_from_stats_collector():
if settings.SENTRY_DSN:
return

def target():
# Fetch DSN from stats collector
logger.info("Fetching Sentry DSN from stats collector")
dsn = None
while True:
try:
client = StatsCollectorClient()
dsn = client.get_sentry_dsn()
except Exception as e:
logger.error("Error while fetching Sentry DSN from stats collector: %s", e)

if dsn:
logger.info("Initializing Sentry with custom DSN")
init_sentry(dsn, settings.ENV)
return

if settings.FETCH_SENTRY_DSN_RETRY_INTERVAL <= 0:
return

logger.info(
"Fetching Sentry DSN: retrying in %.1fs", settings.FETCH_SENTRY_DSN_RETRY_INTERVAL
)
# No DSN or connection error, sleep and retry
time.sleep(settings.FETCH_SENTRY_DSN_RETRY_INTERVAL)

thread = threading.Thread(target=target, daemon=True)
thread.start()
# Not waiting for thread to finish to not block the app start.
# If an exception is raised in the thread, it will be ignored.


class ValidatorConfig(AppConfig):
name = "compute_horde_validator.validator"

def ready(self):
post_migrate.connect(maybe_create_default_admin, sender=self)
maybe_init_sentry_from_stats_collector()
23 changes: 23 additions & 0 deletions validator/app/src/compute_horde_validator/validator/sentry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging


def init_sentry(dsn: str, environment: str) -> None:
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.integrations.redis import RedisIntegration

sentry_sdk.init(
dsn=dsn,
environment=environment,
integrations=[
DjangoIntegration(),
CeleryIntegration(),
RedisIntegration(),
LoggingIntegration(
level=logging.INFO, # Capture info and above as breadcrumbs
event_level=logging.ERROR, # Send error events from log messages
),
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import json
import logging
import time
from typing import TYPE_CHECKING, Any

import pydantic
import requests
from django.conf import settings

if TYPE_CHECKING:
from compute_horde_validator.validator.models import SystemEvent

logger = logging.getLogger(__name__)


class ValidatorSentryDSNResponse(pydantic.BaseModel):
sentry_dsn: str | None


def get_keypair():
return settings.BITTENSOR_WALLET().get_hotkey()


class StatsCollectorError(RuntimeError):
@property
def response(self) -> requests.Response | None:
if isinstance(self.__cause__, requests.RequestException):
return self.__cause__.response
return None


class StatsCollectorClient:
def __init__(self, url: str = settings.STATS_COLLECTOR_URL):
if not url.endswith("/"):
url += "/"
self.url = url
self.keypair = get_keypair()
self.timeout = 5 # seconds

def send_events(self, events: list["SystemEvent"]) -> None:
hotkey = self.keypair.ss58_address
data = [event.to_dict() for event in events]
path = f"validator/{hotkey}/system_events"

self.make_request("POST", path, data=data)

def get_sentry_dsn(self) -> str | None:
hotkey = self.keypair.ss58_address

response = self.make_request("GET", f"validator/{hotkey}/sentry_dsn")

try:
model = ValidatorSentryDSNResponse.model_validate_json(response)
except pydantic.ValidationError as e:
raise StatsCollectorError("Malformed response") from e

return model.sentry_dsn

def make_request(
self,
method: str,
path: str,
*,
headers: dict[str, str] | None = None,
data: Any = None,
**kwargs,
) -> str:
url = self.url + path
logger.info("%s %s", method, url)
try:
response = requests.request(
method,
self.url + path,
json=data,
headers={**self.get_signature_headers(), **(headers or {})},
timeout=self.timeout,
**kwargs,
)
response.raise_for_status()
return response.text
except requests.RequestException as e:
raise StatsCollectorError(repr(e)) from e

def get_signature_headers(self):
hotkey = self.keypair.ss58_address
signing_timestamp = int(time.time())
to_sign = json.dumps(
{"signing_timestamp": signing_timestamp, "validator_ss58_address": hotkey},
sort_keys=True,
)
signature = f"0x{self.keypair.sign(to_sign).hex()}"
return {
"Validator-Signature": signature,
"Validator-Signing-Timestamp": str(signing_timestamp),
}
35 changes: 10 additions & 25 deletions validator/app/src/compute_horde_validator/validator/tasks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import contextlib
import json
import random
import time
import traceback
Expand All @@ -12,7 +11,6 @@
import bittensor
import celery.exceptions
import numpy as np
import requests
from asgiref.sync import async_to_sync
from bittensor.utils.weight_utils import process_weights_for_netuid
from celery import shared_task
Expand Down Expand Up @@ -69,6 +67,7 @@
from . import eviction
from .models import AdminJobRequest
from .scoring import score_batches
from .stats_collector_client import StatsCollectorClient, StatsCollectorError

logger = get_task_logger(__name__)

Expand Down Expand Up @@ -1102,33 +1101,19 @@ def send_events_to_facilitator():
logger.warning("STATS_COLLECTOR_URL is not set, not sending system events")
return

keypair = get_keypair()
hotkey = keypair.ss58_address
signing_timestamp = int(time.time())
to_sign = json.dumps(
{"signing_timestamp": signing_timestamp, "validator_ss58_address": hotkey},
sort_keys=True,
)
signature = f"0x{keypair.sign(to_sign).hex()}"
events = list(events_qs)
data = [event.to_dict() for event in events]
url = settings.STATS_COLLECTOR_URL + f"validator/{hotkey}/system_events"
response = requests.post(
url,
json=data,
headers={
"Validator-Signature": signature,
"Validator-Signing-Timestamp": str(signing_timestamp),
},
)

if response.status_code == 201:
logger.info(f"Sent {len(data)} system events to facilitator")
client = StatsCollectorClient()
try:
client.send_events(events)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

logger.info(f"Sent {len(events)} system events to facilitator")
SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).filter(
id__in=[event.id for event in events]
).update(sent=True)
else:
logger.error(f"Failed to send system events to facilitator: {response}")
except StatsCollectorError as e:
msg = "Failed to send system events to facilitator"
if e.response is not None:
msg = f"{msg}: {e.response}"
logger.error(msg)


@app.task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@ def BITTENSOR_WALLET() -> bittensor.wallet: # type: ignore
TRUSTED_MINER_PORT = 1234

CACHES = {"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}}

SENTRY_DSN = "fake_sentry_dsn"
Loading
Loading