diff --git a/validator/app/src/compute_horde_validator/validator/management/commands/transfer_receipts.py b/validator/app/src/compute_horde_validator/validator/management/commands/transfer_receipts.py index bdd43d85d..567fc33b4 100644 --- a/validator/app/src/compute_horde_validator/validator/management/commands/transfer_receipts.py +++ b/validator/app/src/compute_horde_validator/validator/management/commands/transfer_receipts.py @@ -1,31 +1,7 @@ -import asyncio -import logging -import time -from collections import defaultdict -from collections.abc import Awaitable, Callable, Sequence -from datetime import datetime, timedelta - -import aiohttp from asgiref.sync import async_to_sync -from compute_horde.receipts.store.local import N_ACTIVE_PAGES, LocalFilesystemPagedReceiptStore -from compute_horde.receipts.transfer import ( - MinerInfo, - ReceiptsTransfer, - TransferResult, -) -from django.conf import settings from django.core.management import BaseCommand -from django.utils import timezone -from prometheus_client import Counter, Gauge, Histogram - -from compute_horde_validator.validator.dynamic_config import aget_config -from compute_horde_validator.validator.models import MetagraphSnapshot, Miner - -logger = logging.getLogger(__name__) - -class TransferIsDisabled(Exception): - pass +from compute_horde_validator.validator.receipts.default import Receipts class Command(BaseCommand): @@ -34,37 +10,6 @@ class Command(BaseCommand): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.m_receipts = Counter( - "receipttransfer_receipts_total", - documentation="Number of transferred receipts", - ) - self.m_miners = Gauge( - "receipttransfer_miners", - documentation="Number of miners to transfer from", - ) - self.m_successful_transfers = Counter( - "receipttransfer_successful_transfers_total", - documentation="Number of transfers that didn't explicitly fail. (this includes 404s though)", - ) - self.m_line_errors = Counter( - "receipttransfer_line_errors_total", - labelnames=["exc_type"], - documentation="Number of invalid lines in received pages", - ) - self.m_transfer_errors = Counter( - "receipttransfer_transfer_errors_total", - labelnames=["exc_type"], - documentation="Number of completely failed page transfers", - ) - self.m_transfer_duration = Histogram( - "receipttransfer_transfer_duration", - documentation="Total time to transfer latest page deltas from all miners", - ) - self.m_catchup_pages_left = Gauge( - "receipttransfer_catchup_pages_left", - documentation="Pages waiting for catch-up", - ) - def add_arguments(self, parser): parser.add_argument( "--debug-miner-hotkey", @@ -97,217 +42,10 @@ async def handle( debug_miner_port: int | None, **kwargs, ): - if (debug_miner_hotkey, debug_miner_ip, debug_miner_port) != (None, None, None): - # 1st, use explicitly specified miner if available - if None in {debug_miner_hotkey, debug_miner_ip, debug_miner_port}: - raise ValueError("Either none or all of explicit miner details must be provided") - miner = [debug_miner_hotkey, debug_miner_ip, debug_miner_port] - logger.info(f"Will fetch receipts from explicit miner: {miner}") - - async def miners(): - return [miner] - - elif settings.DEBUG_FETCH_RECEIPTS_FROM_MINERS: - # 2nd, if debug miners are specified, they take precedence. - debug_miners = settings.DEBUG_FETCH_RECEIPTS_FROM_MINERS - logger.info(f"Will fetch receipts from {len(debug_miners)} debug miners") - - async def miners(): - return debug_miners - - else: - # 3rd, if no specific miners were specified, get from metagraph snapshot. - logger.info("Will fetch receipts from metagraph snapshot miners") - - async def miners(): - snapshot = await MetagraphSnapshot.aget_latest() - serving_hotkeys = snapshot.serving_hotkeys - serving_miners = [m async for m in Miner.objects.filter(hotkey__in=serving_hotkeys)] - return [(m.hotkey, m.address, m.port) for m in serving_miners] - - # IMPORTANT: This encompasses at least the current and the previous cycle. - cutoff = timezone.now() - timedelta(hours=5) - - """ - General considerations: - - higher concurrency: - - higher bandwidth use - - more parallel CPU-heavy signature check tasks -> steal CPU time from asyncio thread (GIL) - - lower concurrency: - - slows down the process due to higher influence of network latency - - higher allowed request timeout: - - one slow miner may stall the whole process for longer - - less timeouts due to CPU time being stolen by CPU heavy tasks - """ - - if daemon: - while True: - try: - await self.run_in_loop(cutoff, miners) - except TransferIsDisabled: - # Sleep instead of exiting in case the transfer gets dynamically re-enabled. - logger.info("Transfer is currently disabled. Sleeping for a minute.") - await asyncio.sleep(60) - else: - await self.run_once(cutoff, miners) - - async def run_once( - self, cutoff: datetime, miners: Callable[[], Awaitable[list[MinerInfo]]] - ) -> None: - catchup_cutoff_page = LocalFilesystemPagedReceiptStore.current_page_at(cutoff) - current_page = LocalFilesystemPagedReceiptStore.current_page() - async with aiohttp.ClientSession() as session: - await self.catch_up( - # Pull all pages from newest to oldest - pages=list(reversed(range(catchup_cutoff_page, current_page + 1))), - miners=miners, - session=session, - semaphore=asyncio.Semaphore(50), - ) - - async def run_in_loop( - self, cutoff: datetime, miners: Callable[[], Awaitable[list[MinerInfo]]] - ) -> None: - """ - Do a full catch-up + listen for changes in latest 2 pages indefinitely - """ - catchup_cutoff_page = LocalFilesystemPagedReceiptStore.current_page_at(cutoff) - current_page = LocalFilesystemPagedReceiptStore.current_page() - - # TCP adds significant overhead - it's important to reuse connections. - async with aiohttp.ClientSession() as session: - # Catch-up with the latest pages so that the "keep up" loop has easier time later - await self.catch_up( - pages=list(reversed(range(current_page - N_ACTIVE_PAGES + 1, current_page + 1))), - miners=miners, - session=session, - semaphore=asyncio.Semaphore(50), - ) - await asyncio.gather( - # Slowly catch up with non-active pages, newest first - self.catch_up( - pages=list( - reversed(range(catchup_cutoff_page, current_page - N_ACTIVE_PAGES + 1)) - ), - miners=miners, - session=session, - # Throttle this lower so that it doesn't choke the "keep up" loop - semaphore=asyncio.Semaphore(10), - ), - # Keep up with latest pages continuously in parallel - self.keep_up( - miners=miners, - session=session, - semaphore=asyncio.Semaphore(50), - ), - ) - - async def catch_up( - self, - pages: Sequence[int], - miners: Callable[[], Awaitable[list[MinerInfo]]], - session: aiohttp.ClientSession, - semaphore: asyncio.Semaphore, - ) -> None: - """ - Fetches new receipts on given pages one by one. - """ - for idx, page in enumerate(pages): - await self._throw_if_disabled() - - self.m_catchup_pages_left.set(len(pages) - idx) - start_time = time.monotonic() - current_loop_miners = await miners() - result = await ReceiptsTransfer.transfer( - miners=current_loop_miners, - pages=[page], - session=session, - semaphore=semaphore, - # We may need to download a lot of full pages, so the timeout is higher. - request_timeout=3.0, - ) - elapsed = time.monotonic() - start_time - - logger.info( - f"Catching up: " - f"{page=} ({idx + 1}/{len(pages)}) " - f"receipts={result.n_receipts} " - f"{elapsed=:.3f} " - f"successful_transfers={result.n_successful_transfers} " - f"transfer_errors={len(result.transfer_errors)} " - f"line_errors={len(result.line_errors)} " - ) - - self._push_common_metrics(result) - self.m_catchup_pages_left.set(0) - - async def keep_up( - self, - miners: Callable[[], Awaitable[list[MinerInfo]]], - session: aiohttp.ClientSession, - semaphore: asyncio.Semaphore, - ) -> None: - """ - Runs indefinitely and polls for changes in active pages every `interval`. - """ - while True: - await self._throw_if_disabled() - interval: int = await aget_config("DYNAMIC_RECEIPT_TRANSFER_INTERVAL") - - start_time = time.monotonic() - current_page = LocalFilesystemPagedReceiptStore.current_page() - pages = list(reversed(range(current_page - N_ACTIVE_PAGES + 1, current_page + 1))) - current_loop_miners = await miners() - result = await ReceiptsTransfer.transfer( - miners=current_loop_miners, - pages=pages, - session=session, - semaphore=semaphore, - request_timeout=1.0, - ) - elapsed = time.monotonic() - start_time - - logger.info( - f"Keeping up: " - f"{pages=} " - f"receipts={result.n_receipts} " - f"{elapsed=:.3f} " - f"successful_transfers={result.n_successful_transfers} " - f"transfer_errors={len(result.transfer_errors)} " - f"line_errors={len(result.line_errors)} " - ) - - self._push_common_metrics(result) - self.m_miners.set(len(current_loop_miners)) - self.m_transfer_duration.observe(elapsed) - - # Sleep for the remainder of the time if any - if elapsed < interval: - time.sleep(interval - elapsed) - - def _push_common_metrics(self, result: TransferResult) -> None: - # Push line error counts grouped by the exception type - n_line_errors: defaultdict[type[Exception], int] = defaultdict(int) - for line_error in result.line_errors: - n_line_errors[type(line_error)] += 1 - for exc_type, exc_count in n_line_errors.items(): - self.m_line_errors.labels(exc_type=exc_type.__name__).inc(exc_count) - - # Push transfer error counts grouped by the exception type - n_transfer_errors: defaultdict[type[Exception], int] = defaultdict(int) - for transfer_error in result.transfer_errors: - n_transfer_errors[type(transfer_error)] += 1 - for exc_type, exc_count in n_transfer_errors.items(): - self.m_transfer_errors.labels(exc_type=exc_type.__name__).inc(exc_count) - - self.m_receipts.inc(result.n_receipts) - self.m_successful_transfers.inc(result.n_successful_transfers) - - async def _throw_if_disabled(self): - try: - if await aget_config("DYNAMIC_RECEIPT_TRANSFER_ENABLED"): - return - except KeyError: - logger.warning("DYNAMIC_RECEIPT_TRANSFER_ENABLED dynamic config is not set up!") - - raise TransferIsDisabled + await Receipts().run_receipts_transfer( + daemon=daemon, + debug_miner_hotkey=debug_miner_hotkey, + debug_miner_ip=debug_miner_ip, + debug_miner_port=debug_miner_port, + ) + return diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py index 4a365db7e..2fbb8503c 100644 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py +++ b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver.py @@ -34,7 +34,6 @@ MinerToValidatorMessage, V0StreamingJobReadyRequest, ) -from compute_horde.receipts.models import JobStartedReceipt from compute_horde_core.executor_class import ExecutorClass from django.conf import settings from django.db.models import F @@ -51,6 +50,7 @@ SystemEvent, ) from compute_horde_validator.validator.organic_jobs.miner_client import MinerClient +from compute_horde_validator.validator.receipts.default import Receipts from compute_horde_validator.validator.routing.types import JobRoute from compute_horde_validator.validator.utils import TRUSTED_MINER_FAKE_KEY @@ -374,33 +374,40 @@ async def streaming_ready_callback(msg: V0StreamingJobReadyRequest) -> None: ) # As far as the validator is concerned, the job is as good as failed system_event_subtype = SystemEvent.EventSubType.JOB_REJECTED else: # rejection.msg.reason == JobRejectionReason.BUSY - job_request_time = ( - await JobStartedReceipt.objects.aget(job_uuid=job.job_uuid) - ).timestamp - valid_excuses = await job_excuses.filter_valid_excuse_receipts( - receipts_to_check=rejection.msg.receipts or [], - check_time=job_request_time, - declined_job_uuid=str(job.job_uuid), - declined_job_executor_class=ExecutorClass(job.executor_class), - declined_job_is_synthetic=False, - minimum_validator_stake_for_excuse=await aget_config( - "DYNAMIC_MINIMUM_VALIDATOR_STAKE_FOR_EXCUSE" - ), - miner_hotkey=job.miner.hotkey, - ) - expected_executor_count = await job_excuses.get_expected_miner_executor_count( - check_time=job_request_time, - miner_hotkey=job.miner.hotkey, - executor_class=ExecutorClass(job.executor_class), + job_started_receipt = await Receipts().get_job_started_receipt_by_uuid( + str(job.job_uuid) ) - if len(valid_excuses) >= expected_executor_count: - comment = "Miner properly excused job" - status = OrganicJob.Status.EXCUSED - system_event_subtype = SystemEvent.EventSubType.JOB_EXCUSED - else: + if job_started_receipt is None: + logger.error(f"No job started receipt found for job {job.job_uuid}") comment = "Miner failed to excuse job" status = OrganicJob.Status.FAILED system_event_subtype = SystemEvent.EventSubType.JOB_REJECTED + else: + job_request_time = job_started_receipt.timestamp + valid_excuses = await job_excuses.filter_valid_excuse_receipts( + receipts_to_check=rejection.msg.receipts or [], + check_time=job_request_time, + declined_job_uuid=str(job.job_uuid), + declined_job_executor_class=ExecutorClass(job.executor_class), + declined_job_is_synthetic=False, + minimum_validator_stake_for_excuse=await aget_config( + "DYNAMIC_MINIMUM_VALIDATOR_STAKE_FOR_EXCUSE" + ), + miner_hotkey=job.miner.hotkey, + ) + expected_executor_count = await job_excuses.get_expected_miner_executor_count( + check_time=job_request_time, + miner_hotkey=job.miner.hotkey, + executor_class=ExecutorClass(job.executor_class), + ) + if len(valid_excuses) >= expected_executor_count: + comment = "Miner properly excused job" + status = OrganicJob.Status.EXCUSED + system_event_subtype = SystemEvent.EventSubType.JOB_EXCUSED + else: + comment = "Miner failed to excuse job" + status = OrganicJob.Status.FAILED + system_event_subtype = SystemEvent.EventSubType.JOB_REJECTED logger.info(comment) job.comment = comment diff --git a/validator/app/src/compute_horde_validator/validator/receipts/README.md b/validator/app/src/compute_horde_validator/validator/receipts/README.md index 1ac15a818..15a37c50c 100644 --- a/validator/app/src/compute_horde_validator/validator/receipts/README.md +++ b/validator/app/src/compute_horde_validator/validator/receipts/README.md @@ -1,123 +1,105 @@ -# Receipts Module +### Receipts module: public interface -This module provides an interface for managing receipts in the validator. +This module manages receipt creation and transfer between validators and miners. -## Basic Usage +## CLI entry point -```python -from compute_horde_validator.validator.receipts import Receipts - -# Create receipts manager -receipts = Receipts() - -# Get completed job receipts for scoring -completed_receipts = receipts.get_completed_job_receipts_for_block_range( - start_block=1000, - end_block=2000 -) +- **Command**: `python manage.py transfer_receipts` +- **Args**: + - `--daemon` (flag): run continuously; otherwise runs a single transfer cycle + - `--debug-miner-hotkey `: fetch only from this miner (debug) + - `--debug-miner-ip `: debug miner IP + - `--debug-miner-port `: debug miner port -# Create a job finished receipt -receipt = receipts.create_job_finished_receipt( - job_uuid="job-123", - miner_hotkey="miner_hotkey", - validator_hotkey="validator_hotkey", - time_started=1640995200, - time_took_us=5000000, - score_str="0.85" -) +When all three debug miner parameters are provided, transfer runs in explicit mode for that miner. If not provided and `DEBUG_FETCH_RECEIPTS_FROM_MINERS` is set in settings, transfer runs against those debug miners. Otherwise, miners are resolved from the latest metagraph snapshot. -# Save the receipt -receipts.save_receipt(receipt.to_receipt()) +## Python API -# Scrape receipts from miners -scraped_receipts = await receipts.scrape_receipts_from_miners(["miner1", "miner2"]) -``` +Default implementation lives in `compute_horde_validator.validator.receipts.default.Receipts` and implements the abstract interface in `compute_horde_validator.validator.receipts.base.ReceiptsBase`. -## Core Functionality - -### Receipts Retrieval - -The primary method for retrieve methods in given block range: +- Run transfer loop (or once): ```python -# Get completed job receipts for scoring -completed_receipts = manager.get_completed_job_receipts_for_block_range( - start_block=1000, - end_block=2000 +await Receipts().run_receipts_transfer( + daemon: bool, + debug_miner_hotkey: str | None, + debug_miner_ip: str | None, + debug_miner_port: int | None, ) ``` -### Receipt Creation - -The module can create receipts for completed jobs: +- Create receipts: ```python -# Create job finished receipt -receipt = receipts.create_job_finished_receipt( - job_uuid="job-123", - miner_hotkey="miner_hotkey", - validator_hotkey="validator_hotkey", - time_started=1640995200, # Unix timestamp - time_took_us=5000000, # 5 seconds in microseconds - score_str="0.85" +payload, validator_signature = Receipts().create_job_started_receipt( + job_uuid: str, + miner_hotkey: str, + validator_hotkey: str, + executor_class: str, + is_organic: bool, + ttl: int, ) -``` -### Receipt Scraping +finished = Receipts().create_job_finished_receipt( + job_uuid: str, + miner_hotkey: str, + validator_hotkey: str, + time_started: datetime.datetime, + time_took_us: int, + score_str: str, +) +``` -The module can scrape receipts from miners: +- Query receipts: ```python -# Scrape receipts from specific miners -scraped_receipts = await receipts.scrape_receipts_from_miners([ - "miner_hotkey_1", - "miner_hotkey_2" -], start_block=1000, end_block=2000) -``` - -### Receipt Persistence +# All valid JobStarted for a miner at a timestamp +receipts: list[JobStartedReceipt] = await Receipts().get_valid_job_started_receipts_for_miner( + miner_hotkey: str, + at_time: datetime.datetime, +) -The module provides methods to save and retrieve receipts: +# JobFinished for a miner and a set of job UUIDs +receipts: list[JobFinishedReceipt] = await Receipts().get_job_finished_receipts_for_miner( + miner_hotkey: str, + job_uuids: list[str], +) -```python -# Save a receipt to the database -receipts.save_receipt(receipt) +# JobStarted by job UUID +receipt: JobStartedReceipt | None = await Receipts().get_job_started_receipt_by_uuid(job_uuid: str) -# Get a receipt by job UUID -receipt = receipts.get_receipt_by_job_uuid("job-123") +# Completed job receipts for a block range [start_block, end_block) +receipts: list[Receipt] = await Receipts().get_completed_job_receipts_for_block_range( + start_block: int, + end_block: int, +) ``` -## Integration with compute_horde +## Miner selection modes -The receipts module uses the `compute_horde.receipts` module internally for: -- Receipt models and schemas -- Receipt validation and serialization -- Receipt transfer functionality -- Database models and migrations +- **explicit**: when all `debug_miner_*` are passed to `run_receipts_transfer` +- **debug_settings**: when `settings.DEBUG_FETCH_RECEIPTS_FROM_MINERS` is non-empty +- **metagraph**: default; miners are taken from `MetagraphSnapshot` -## Integration with Scoring +## Configuration -The receipts module is designed to work seamlessly with the scoring system: +- **Dynamic config** (fetched via `aget_config`): + - `DYNAMIC_RECEIPT_TRANSFER_ENABLED: bool` — enable/disable transfer (default: `False`) + - `DYNAMIC_RECEIPT_TRANSFER_INTERVAL: int` — seconds between polling loops (default: `2`) -1. **Block-based filtering**: Provides method to get receipts for specific block ranges -2. **Completed job receipts**: Specialized method for getting receipts of completed jobs -3. **Scoring data extraction**: Receipts contain all necessary data for scoring calculations -4. **Performance metrics**: Job finished receipts include timing and score information +- **Settings / env**: + - `DEBUG_FETCH_RECEIPTS_FROM_MINERS` — list of `"hotkey:ip:port"` values; in settings exposed as + `settings.DEBUG_FETCH_RECEIPTS_FROM_MINERS: list[tuple[str, str, int]]` + - `RECEIPT_TRANSFER_CHECKPOINT_CACHE` — cache key namespace used for checkpoints (default: `"receipts_checkpoints"`) -## Error Handling +## Metrics (Prometheus) -The module provides specific exceptions for different error scenarios: +- `receipttransfer_receipts_total` — number of transferred receipts +- `receipttransfer_miners` — number of miners in the current loop +- `receipttransfer_successful_transfers_total` — count of non-failed transfers +- `receipttransfer_line_errors_total{exc_type}` — per-exception count of line errors +- `receipttransfer_transfer_errors_total{exc_type}` — per-exception count of transfer errors +- `receipttransfer_transfer_duration` — histogram of total loop duration +- `receipttransfer_catchup_pages_left` — gauge of pages left to catch up -```python -from compute_horde_validator.validator.receipts.exceptions import ( - ReceiptsConfigurationError, - ReceiptsScrapingError, - ReceiptsGenerationError, -) -try: - receipt = receipts.create_job_finished_receipt(...) -except ReceiptsGenerationError as e: - # Handle generation error - pass -``` diff --git a/validator/app/src/compute_horde_validator/validator/receipts/__init__.py b/validator/app/src/compute_horde_validator/validator/receipts/__init__.py index 3b8c52002..48691a81b 100644 --- a/validator/app/src/compute_horde_validator/validator/receipts/__init__.py +++ b/validator/app/src/compute_horde_validator/validator/receipts/__init__.py @@ -4,14 +4,12 @@ This module provides receipts management functionality for the validator. """ -from .exceptions import ReceiptsConfigurationError, ReceiptsGenerationError, ReceiptsScrapingError -from .interface import ReceiptsBase -from .manager import Receipts +from .base import ReceiptsBase +from .default import Receipts +from .types import ReceiptsGenerationError __all__ = [ "ReceiptsBase", "Receipts", - "ReceiptsConfigurationError", "ReceiptsGenerationError", - "ReceiptsScrapingError", ] diff --git a/validator/app/src/compute_horde_validator/validator/receipts/base.py b/validator/app/src/compute_horde_validator/validator/receipts/base.py index 793e318f8..b402223c3 100644 --- a/validator/app/src/compute_horde_validator/validator/receipts/base.py +++ b/validator/app/src/compute_horde_validator/validator/receipts/base.py @@ -1,7 +1,10 @@ +import datetime from abc import ABC, abstractmethod from compute_horde.receipts import Receipt -from compute_horde.receipts.models import JobFinishedReceipt +from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.schemas import JobStartedReceiptPayload +from compute_horde.receipts.transfer import TransferResult class ReceiptsBase(ABC): @@ -12,35 +15,47 @@ class ReceiptsBase(ABC): """ @abstractmethod - def get_completed_job_receipts_for_block_range( - self, start_block: int, end_block: int - ) -> list[Receipt]: + async def run_receipts_transfer( + self, + daemon: bool, + debug_miner_hotkey: str | None, + debug_miner_ip: str | None, + debug_miner_port: int | None, + ) -> None: """ - Get all receipts for jobs that were completed between the specified blocks. + Run the receipts transfer loop (or a single iteration). Args: - start_block: Start block (inclusive) - end_block: End block (exclusive) - - Returns: - List of receipts for completed jobs in the block range + daemon: If True, run indefinitely; otherwise perform a single transfer + debug_miner_hotkey: Explicit miner hotkey to fetch from (debug only) + debug_miner_ip: Explicit miner IP to fetch from (debug only) + debug_miner_port: Explicit miner port to fetch from (debug only) """ pass @abstractmethod - async def scrape_receipts_from_miners( - self, miner_hotkeys: list[str], start_block: int, end_block: int - ) -> list[Receipt]: + def create_job_started_receipt( + self, + job_uuid: str, + miner_hotkey: str, + validator_hotkey: str, + executor_class: str, + is_organic: bool, + ttl: int, + ) -> tuple[JobStartedReceiptPayload, str]: """ - Scrape receipts from specified miners within a block range. + Create a job started receipt. Args: - miner_hotkeys: List of miner hotkeys to scrape receipts from - start_block: Start block (inclusive) - if None, scrapes all available receipts - end_block: End block (exclusive) - if None, scrapes all available receipts + job_uuid: UUID of the job + miner_hotkey: Hotkey of the miner + validator_hotkey: Hotkey of the validator + executor_class: Executor class for the job + is_organic: Whether the job is organic + ttl: Time to live for the receipt Returns: - List of scraped receipts + Tuple of (payload, signature) """ pass @@ -50,32 +65,126 @@ def create_job_finished_receipt( job_uuid: str, miner_hotkey: str, validator_hotkey: str, - time_started: int, + time_started: datetime.datetime, time_took_us: int, score_str: str, ) -> JobFinishedReceipt: """ - Create a job finished receipt for a completed job. + Create a job finished receipt. Args: job_uuid: UUID of the job miner_hotkey: Hotkey of the miner validator_hotkey: Hotkey of the validator - time_started: Timestamp when job started - time_took_us: Time taken in microseconds - score_str: Score as string + time_started: When the job started + time_took_us: How long the job took in microseconds + score_str: Score string for the job + + Returns: + Created JobFinishedReceipt + """ + pass + + @abstractmethod + async def get_valid_job_started_receipts_for_miner( + self, miner_hotkey: str, at_time: datetime.datetime + ) -> list[JobStartedReceipt]: + """ + Get valid job started receipts for a miner at a specific time. + + Args: + miner_hotkey: Hotkey of the miner + at_time: Time to check validity at + + Returns: + List of valid JobStartedReceipt objects + """ + pass + + @abstractmethod + async def get_job_finished_receipts_for_miner( + self, miner_hotkey: str, job_uuids: list[str] + ) -> list[JobFinishedReceipt]: + """ + Get job finished receipts for a miner and specific job UUIDs. + + Args: + miner_hotkey: Hotkey of the miner + job_uuids: List of job UUIDs to get receipts for + + Returns: + List of JobFinishedReceipt objects + """ + pass + + @abstractmethod + async def get_job_started_receipt_by_uuid(self, job_uuid: str) -> JobStartedReceipt | None: + """ + Get a job started receipt by UUID. + + Args: + job_uuid: UUID of the job Returns: - Created job finished receipt + JobStartedReceipt if found, None otherwise """ pass @abstractmethod - def save_receipt(self, receipt: Receipt) -> None: + async def get_completed_job_receipts_for_block_range( + self, start_block: int, end_block: int + ) -> list[Receipt]: """ - Save a receipt to the database. + Get all receipts for jobs that were completed between the specified blocks. Args: - receipt: Receipt to save + start_block: Start block (inclusive) + end_block: End block (exclusive) + + Returns: + List of receipts for completed jobs in the block range + """ + pass + + @abstractmethod + async def _transfer_receipts_from_miners( + self, + miner_hotkeys: list[str], + pages: list[int], + semaphore_limit: int = 50, + request_timeout: float = 3.0, + ) -> TransferResult: + """ + Private helper: transfer receipts from miners for specific pages. + """ + pass + + @abstractmethod + async def _run_full_transfer_cycle( + self, + miner_hotkeys: list[str], + cutoff_hours: int = 5, + n_active_pages: int = 2, + active_semaphore_limit: int = 50, + catchup_semaphore_limit: int = 10, + active_timeout: float = 1.0, + catchup_timeout: float = 3.0, + ) -> tuple[TransferResult, TransferResult]: + """ + Private helper: run a full transfer cycle for tests or internal orchestration. + """ + pass + + @abstractmethod + async def _fetch_receipts_for_range(self, start_block: int, end_block: int) -> list[Receipt]: + """ + Fetch receipts for a block range from local storage. + + Args: + start_block: Start block (inclusive) + end_block: End block (exclusive) + + Returns: + List of receipts for the block range """ pass diff --git a/validator/app/src/compute_horde_validator/validator/receipts/default.py b/validator/app/src/compute_horde_validator/validator/receipts/default.py index 48003a62e..0a355016c 100644 --- a/validator/app/src/compute_horde_validator/validator/receipts/default.py +++ b/validator/app/src/compute_horde_validator/validator/receipts/default.py @@ -1,116 +1,615 @@ +import asyncio import datetime import logging +import time +from dataclasses import dataclass +import aiohttp +from asgiref.sync import sync_to_async from compute_horde.receipts import Receipt -from compute_horde.receipts.models import JobFinishedReceipt, receipt_to_django_model -from compute_horde.receipts.schemas import JobFinishedReceiptPayload +from compute_horde.receipts.models import ( + JobFinishedReceipt, + JobStartedReceipt, +) +from compute_horde.receipts.schemas import JobFinishedReceiptPayload, JobStartedReceiptPayload +from compute_horde.receipts.store.local import N_ACTIVE_PAGES, LocalFilesystemPagedReceiptStore +from compute_horde.receipts.transfer import ReceiptsTransfer, TransferResult +from compute_horde.utils import sign_blob +from django.conf import settings +from django.utils import timezone +from prometheus_client import Counter, Gauge, Histogram -from compute_horde_validator.validator.receipts.exceptions import ( +from compute_horde_validator.validator.allowance.utils.supertensor import supertensor +from compute_horde_validator.validator.dynamic_config import aget_config +from compute_horde_validator.validator.models import MetagraphSnapshot, Miner +from compute_horde_validator.validator.models.allowance.internal import Block +from compute_horde_validator.validator.receipts.base import ReceiptsBase +from compute_horde_validator.validator.receipts.types import ( ReceiptsGenerationError, ) -from compute_horde_validator.validator.receipts.interface import ReceiptsBase logger = logging.getLogger(__name__) +class _TransferIsDisabled(Exception): + pass + + +@dataclass +class _Metrics: + receipts: Counter + miners: Gauge + successful_transfers: Counter + line_errors: Counter + transfer_errors: Counter + transfer_duration: Histogram + catchup_pages_left: Gauge + + class Receipts(ReceiptsBase): """ Default implementation of receipts manager. """ - def get_completed_job_receipts_for_block_range( - self, start_block: int, end_block: int - ) -> list[Receipt]: - """ - Get all receipts for jobs that were completed between the specified blocks. + async def run_receipts_transfer( + self, + daemon: bool, + debug_miner_hotkey: str | None, + debug_miner_ip: str | None, + debug_miner_port: int | None, + ) -> None: + metrics = _Metrics( + receipts=Counter( + "receipttransfer_receipts_total", documentation="Number of transferred receipts" + ), + miners=Gauge( + "receipttransfer_miners", documentation="Number of miners to transfer from" + ), + successful_transfers=Counter( + "receipttransfer_successful_transfers_total", + documentation="Number of transfers that didn't explicitly fail. (this includes 404s though)", + ), + line_errors=Counter( + "receipttransfer_line_errors_total", + labelnames=["exc_type"], + documentation="Number of invalid lines in received pages", + ), + transfer_errors=Counter( + "receipttransfer_transfer_errors_total", + labelnames=["exc_type"], + documentation="Number of completely failed page transfers", + ), + transfer_duration=Histogram( + "receipttransfer_transfer_duration", + documentation="Total time to transfer latest page deltas from all miners", + ), + catchup_pages_left=Gauge( + "receipttransfer_catchup_pages_left", + documentation="Pages waiting for catch-up", + ), + ) - Args: - start_block: Start block (inclusive) - end_block: End block (exclusive) + mode, explicit_miner = await self._determine_miners_mode( + debug_miner_hotkey, debug_miner_ip, debug_miner_port + ) + cutoff = timezone.now() - datetime.timedelta(hours=5) - Returns: - List of receipts for completed jobs in the block range - """ - # TODO: Implement block-based filtering - # For now, return all job finished receipts - finished_receipts = list(JobFinishedReceipt.objects.all()) + if daemon: + while True: + try: + await self._run_in_loop(cutoff, mode, explicit_miner, metrics) + except _TransferIsDisabled: + logger.info("Transfer is currently disabled. Sleeping for a minute.") + await asyncio.sleep(60) + else: + await self._run_once(cutoff, mode, explicit_miner, metrics) - # Convert to Receipt objects - receipts = [] - for receipt in finished_receipts: - receipts.append(receipt.to_receipt()) + async def _determine_miners_mode( + self, + debug_miner_hotkey: str | None, + debug_miner_ip: str | None, + debug_miner_port: int | None, + ) -> tuple[str, tuple[str, str, int] | None]: + if (debug_miner_hotkey, debug_miner_ip, debug_miner_port) != (None, None, None): + if None in {debug_miner_hotkey, debug_miner_ip, debug_miner_port}: + raise ValueError("Either none or all of explicit miner details must be provided") + # All values are guaranteed non-None here due to the check above. + assert debug_miner_hotkey is not None + assert debug_miner_ip is not None + assert debug_miner_port is not None + miner: tuple[str, str, int] = ( + debug_miner_hotkey, + debug_miner_ip, + int(debug_miner_port), + ) + logger.info(f"Will fetch receipts from explicit miner: {list(miner)}") + return "explicit", miner + if settings.DEBUG_FETCH_RECEIPTS_FROM_MINERS: + debug_miners = settings.DEBUG_FETCH_RECEIPTS_FROM_MINERS + logger.info(f"Will fetch receipts from {len(debug_miners)} debug miners") + return "debug_settings", None + logger.info("Will fetch receipts from metagraph snapshot miners") + return "metagraph", None - return receipts + async def _list_miners( + self, mode: str, explicit_miner: tuple[str, str, int] | None + ) -> list[tuple[str, str, int]]: + if mode == "explicit": + assert explicit_miner is not None + return [explicit_miner] + if mode == "debug_settings": + return settings.DEBUG_FETCH_RECEIPTS_FROM_MINERS + # metagraph mode + snapshot = await MetagraphSnapshot.aget_latest() + serving_hotkeys = snapshot.serving_hotkeys + serving_miners = [m async for m in Miner.objects.filter(hotkey__in=serving_hotkeys)] + return [(m.hotkey, m.address, m.port) for m in serving_miners] - async def scrape_receipts_from_miners( - self, miner_hotkeys: list[str], start_block: int, end_block: int - ) -> list[Receipt]: - """ - Scrape receipts from specified miners. + async def _throw_if_disabled(self) -> None: + try: + if await aget_config("DYNAMIC_RECEIPT_TRANSFER_ENABLED"): + return + except KeyError: + logger.warning("DYNAMIC_RECEIPT_TRANSFER_ENABLED dynamic config is not set up!") + raise _TransferIsDisabled + + def _push_common_metrics(self, result: TransferResult, metrics: _Metrics) -> None: + from collections import defaultdict + + n_line_errors: defaultdict[type[Exception], int] = defaultdict(int) + for line_error in result.line_errors: + n_line_errors[type(line_error)] += 1 + for exc_type, exc_count in n_line_errors.items(): + metrics.line_errors.labels(exc_type=exc_type.__name__).inc(exc_count) + + n_transfer_errors: defaultdict[type[Exception], int] = defaultdict(int) + for transfer_error in result.transfer_errors: + n_transfer_errors[type(transfer_error)] += 1 + for exc_type, exc_count in n_transfer_errors.items(): + metrics.transfer_errors.labels(exc_type=exc_type.__name__).inc(exc_count) + + metrics.receipts.inc(result.n_receipts) + metrics.successful_transfers.inc(result.n_successful_transfers) + + async def _catch_up( + self, + pages: list[int], + mode: str, + explicit_miner: tuple[str, str, int] | None, + session: aiohttp.ClientSession, + semaphore: asyncio.Semaphore, + metrics: _Metrics, + ) -> None: + for idx, page in enumerate(pages): + await self._throw_if_disabled() + + metrics.catchup_pages_left.set(len(pages) - idx) + start_time = time.monotonic() + current_loop_miners = await self._list_miners(mode, explicit_miner) + result = await ReceiptsTransfer.transfer( + miners=current_loop_miners, + pages=[page], + session=session, + semaphore=semaphore, + request_timeout=3.0, + ) + elapsed = time.monotonic() - start_time + + logger.info( + f"Catching up: " + f"{page=} ({idx + 1}/{len(pages)}) " + f"receipts={result.n_receipts} " + f"{elapsed=:.3f} " + f"successful_transfers={result.n_successful_transfers} " + f"transfer_errors={len(result.transfer_errors)} " + f"line_errors={len(result.line_errors)} " + ) + + self._push_common_metrics(result, metrics) + metrics.catchup_pages_left.set(0) + + async def _keep_up( + self, + mode: str, + explicit_miner: tuple[str, str, int] | None, + session: aiohttp.ClientSession, + semaphore: asyncio.Semaphore, + metrics: _Metrics, + ) -> None: + while True: + await self._throw_if_disabled() + interval: int = await aget_config("DYNAMIC_RECEIPT_TRANSFER_INTERVAL") - Args: - miner_hotkeys: List of miner hotkeys to scrape receipts from + start_time = time.monotonic() + current_page = LocalFilesystemPagedReceiptStore.current_page() + pages = list(reversed(range(current_page - N_ACTIVE_PAGES + 1, current_page + 1))) + current_loop_miners = await self._list_miners(mode, explicit_miner) + result = await ReceiptsTransfer.transfer( + miners=current_loop_miners, + pages=pages, + session=session, + semaphore=semaphore, + request_timeout=1.0, + ) + elapsed = time.monotonic() - start_time + + logger.info( + f"Keeping up: " + f"{pages=} " + f"receipts={result.n_receipts} " + f"{elapsed=:.3f} " + f"successful_transfers={result.n_successful_transfers} " + f"transfer_errors={len(result.transfer_errors)} " + f"line_errors={len(result.line_errors)} " + ) + + self._push_common_metrics(result, metrics) + metrics.miners.set(len(current_loop_miners)) + metrics.transfer_duration.observe(elapsed) + + if elapsed < interval: + time.sleep(interval - elapsed) + + async def _run_once( + self, + cutoff_ts: datetime.datetime, + mode: str, + explicit_miner: tuple[str, str, int] | None, + metrics: _Metrics, + ) -> None: + catchup_cutoff_page = LocalFilesystemPagedReceiptStore.current_page_at(cutoff_ts) + current_page = LocalFilesystemPagedReceiptStore.current_page() + async with aiohttp.ClientSession() as session: + await self._catch_up( + pages=list(reversed(range(catchup_cutoff_page, current_page + 1))), + mode=mode, + explicit_miner=explicit_miner, + session=session, + semaphore=asyncio.Semaphore(50), + metrics=metrics, + ) + + async def _run_in_loop( + self, + cutoff_ts: datetime.datetime, + mode: str, + explicit_miner: tuple[str, str, int] | None, + metrics: _Metrics, + ) -> None: + catchup_cutoff_page = LocalFilesystemPagedReceiptStore.current_page_at(cutoff_ts) + current_page = LocalFilesystemPagedReceiptStore.current_page() + async with aiohttp.ClientSession() as session: + await self._catch_up( + pages=list(reversed(range(current_page - N_ACTIVE_PAGES + 1, current_page + 1))), + mode=mode, + explicit_miner=explicit_miner, + session=session, + semaphore=asyncio.Semaphore(50), + metrics=metrics, + ) + await asyncio.gather( + self._catch_up( + pages=list( + reversed(range(catchup_cutoff_page, current_page - N_ACTIVE_PAGES + 1)) + ), + mode=mode, + explicit_miner=explicit_miner, + session=session, + semaphore=asyncio.Semaphore(10), + metrics=metrics, + ), + self._keep_up( + mode=mode, + explicit_miner=explicit_miner, + session=session, + semaphore=asyncio.Semaphore(50), + metrics=metrics, + ), + ) + + async def _transfer_receipts_from_miners( + self, + miner_hotkeys: list[str], + pages: list[int], + semaphore_limit: int = 50, + request_timeout: float = 3.0, + ) -> TransferResult: + if not miner_hotkeys or not pages: + return TransferResult(0, 0, [], []) + + miners = await self._fetch_miners(miner_hotkeys) + miner_infos: list[tuple[str, str, int]] = [ + (m[0], m[1], m[2]) for m in miners if m[1] and m[2] and m[1] != "0.0.0.0" + ] + if not miner_infos: + return TransferResult(0, 0, [], []) + + semaphore = asyncio.Semaphore(semaphore_limit) + async with aiohttp.ClientSession() as session: + return await ReceiptsTransfer.transfer( + miners=miner_infos, + pages=pages, + session=session, + semaphore=semaphore, + request_timeout=request_timeout, + ) - Returns: - List of scraped receipts - """ - # TODO: Implement actual scraping logic - logger.info(f"Scraping receipts from {len(miner_hotkeys)} miners") - return [] + async def _run_full_transfer_cycle( + self, + miner_hotkeys: list[str], + cutoff_hours: int = 5, + n_active_pages: int = 2, + active_semaphore_limit: int = 50, + catchup_semaphore_limit: int = 10, + active_timeout: float = 1.0, + catchup_timeout: float = 3.0, + ) -> tuple[TransferResult, TransferResult]: + # Compute page windows + cutoff_ts = timezone.now() - datetime.timedelta(hours=cutoff_hours) + catchup_cutoff_page = LocalFilesystemPagedReceiptStore.current_page_at(cutoff_ts) + current_page = LocalFilesystemPagedReceiptStore.current_page() + + active_pages = list(reversed(range(current_page - n_active_pages + 1, current_page + 1))) + catchup_pages = list( + reversed( + range( + catchup_cutoff_page, max(catchup_cutoff_page, current_page - n_active_pages + 1) + ) + ) + ) + + miners = await self._fetch_miners(miner_hotkeys) + miner_infos: list[tuple[str, str, int]] = [ + (m[0], m[1], m[2]) for m in miners if m[1] and m[2] and m[1] != "0.0.0.0" + ] + if not miner_infos: + return TransferResult(0, 0, [], []), TransferResult(0, 0, [], []) + + async with aiohttp.ClientSession() as session: + active_result = await ReceiptsTransfer.transfer( + miners=miner_infos, + pages=active_pages, + session=session, + semaphore=asyncio.Semaphore(active_semaphore_limit), + request_timeout=active_timeout, + ) + catchup_result = await ReceiptsTransfer.transfer( + miners=miner_infos, + pages=catchup_pages, + session=session, + semaphore=asyncio.Semaphore(catchup_semaphore_limit), + request_timeout=catchup_timeout, + ) + + return active_result, catchup_result def create_job_finished_receipt( self, job_uuid: str, miner_hotkey: str, validator_hotkey: str, - time_started: int, + time_started: datetime.datetime, time_took_us: int, score_str: str, ) -> JobFinishedReceipt: - """ - Create a job finished receipt for a completed job. - - Args: - job_uuid: UUID of the job - miner_hotkey: Hotkey of the miner - validator_hotkey: Hotkey of the validator - time_started: Timestamp when job started - time_took_us: Time taken in microseconds - score_str: Score as string - - Returns: - Created job finished receipt - """ + payload = JobFinishedReceiptPayload( + job_uuid=job_uuid, + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + timestamp=datetime.datetime.now(datetime.UTC), + time_started=time_started, + time_took_us=time_took_us, + score_str=score_str, + ) + + validator_kp = settings.BITTENSOR_WALLET().get_hotkey() + validator_signature = sign_blob(validator_kp, payload.blob_for_signing()) + + return JobFinishedReceipt( + job_uuid=job_uuid, + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + validator_signature=validator_signature, + timestamp=payload.timestamp, + time_started=time_started, + time_took_us=time_took_us, + score_str=score_str, + ) + + def create_job_started_receipt( + self, + job_uuid: str, + miner_hotkey: str, + validator_hotkey: str, + executor_class: str, + is_organic: bool, + ttl: int, + ) -> tuple[JobStartedReceiptPayload, str]: try: - payload = JobFinishedReceiptPayload( + payload = JobStartedReceiptPayload( job_uuid=job_uuid, miner_hotkey=miner_hotkey, validator_hotkey=validator_hotkey, - timestamp=datetime.datetime.now(), - time_started=datetime.datetime.fromtimestamp(time_started), - time_took_us=time_took_us, - score_str=score_str, + timestamp=datetime.datetime.now(datetime.UTC), + executor_class=executor_class, + is_organic=is_organic, + ttl=ttl, ) - # TODO: Add proper signature generation - validator_signature = "placeholder_signature" + validator_kp = settings.BITTENSOR_WALLET().get_hotkey() + validator_signature = sign_blob(validator_kp, payload.blob_for_signing()) + + logger.debug( + "Created JobStartedReceipt payload for job %s (miner: %s, validator: %s, organic: %s)", + job_uuid, + miner_hotkey, + validator_hotkey, + is_organic, + ) + + return payload, validator_signature + + except Exception as e: + raise ReceiptsGenerationError(f"Failed to create job started receipt: {e}") from e + + async def get_valid_job_started_receipts_for_miner( + self, miner_hotkey: str, at_time: datetime.datetime + ) -> list[JobStartedReceipt]: + try: + + def _query() -> list[JobStartedReceipt]: + qs = JobStartedReceipt.objects.valid_at(at_time).filter(miner_hotkey=miner_hotkey) + return list(qs.all()) + + receipts: list[JobStartedReceipt] = await sync_to_async(_query, thread_sensitive=True)() + + logger.debug( + "Retrieved %s valid job started receipts for miner %s at %s", + len(receipts), + miner_hotkey, + at_time, + ) - receipt = JobFinishedReceipt.from_payload(payload, validator_signature) + return receipts - return receipt except Exception as e: - raise ReceiptsGenerationError(f"Failed to create job finished receipt: {e}") from e + logger.error("Failed to get valid job started receipts for miner: %s", e) + return [] + + async def get_job_finished_receipts_for_miner( + self, miner_hotkey: str, job_uuids: list[str] + ) -> list[JobFinishedReceipt]: + try: + if not job_uuids: + return [] + + def _query() -> list[JobFinishedReceipt]: + qs = JobFinishedReceipt.objects.filter( + miner_hotkey=miner_hotkey, job_uuid__in=job_uuids + ) + return list(qs.all()) - def save_receipt(self, receipt: Receipt) -> None: - """ - Save a receipt to the database. + receipts: list[JobFinishedReceipt] = await sync_to_async( + _query, thread_sensitive=True + )() - Args: - receipt: Receipt to save - """ + logger.debug( + "Retrieved %s job finished receipts for miner %s (jobs: %s)", + len(receipts), + miner_hotkey, + len(job_uuids), + ) + + return receipts + + except Exception as e: + logger.error("Failed to get job finished receipts for miner: %s", e) + return [] + + async def get_job_started_receipt_by_uuid(self, job_uuid: str) -> JobStartedReceipt | None: try: - django_model = receipt_to_django_model(receipt) - django_model.save() - logger.info(f"Saved receipt for job {receipt.payload.job_uuid}") + django_receipt = await sync_to_async( + JobStartedReceipt.objects.get, thread_sensitive=True + )(job_uuid=job_uuid) + logger.debug( + "Retrieved JobStartedReceipt for job %s (miner: %s, validator: %s)", + job_uuid, + django_receipt.miner_hotkey, + django_receipt.validator_hotkey, + ) + return django_receipt + except JobStartedReceipt.DoesNotExist: + logger.debug("No JobStartedReceipt found for job %s", job_uuid) + return None except Exception as e: - raise ReceiptsGenerationError(f"Failed to save receipt: {e}") from e + logger.error("Failed to get JobStartedReceipt for job %s: %s", job_uuid, e) + return None + + async def get_completed_job_receipts_for_block_range( + self, start_block: int, end_block: int + ) -> list[Receipt]: + if start_block >= end_block: + logger.warning( + "Invalid block range provided: start_block (%s) >= end_block (%s)", + start_block, + end_block, + ) + return [] + + try: + start_timestamp = await self._get_block_timestamp(start_block) + end_timestamp = await self._get_block_timestamp(end_block) + + finished_receipts_qs = JobFinishedReceipt.objects.filter( + timestamp__gte=start_timestamp, + timestamp__lt=end_timestamp, + ) + receipts: list[Receipt] = [] + async for django_receipt in finished_receipts_qs: + receipts.append(django_receipt.to_receipt()) + + logger.info( + "Found %s completed job receipts for blocks %s-%s", + len(receipts), + start_block, + end_block, + ) + return receipts + except Exception as ex: + logger.error( + "Failed to list receipts for block range %s-%s: %s", + start_block, + end_block, + ex, + ) + return [] + + async def _fetch_miners(self, hotkeys: list[str]) -> list[tuple[str, str, int]]: + """Fetch miner endpoints (hotkey, address, port) for given hotkeys.""" + + def _query() -> list[tuple[str, str, int]]: + return list( + Miner.objects.filter(hotkey__in=hotkeys).values_list("hotkey", "address", "port") + ) + + return await sync_to_async(_query, thread_sensitive=True)() + + async def _fetch_receipts_for_range(self, start_block: int, end_block: int) -> list[Receipt]: + """Fetch JobFinished receipts for blocks in [start_block, end_block).""" + + start_ts = await self._get_block_timestamp(start_block) + end_ts = await self._get_block_timestamp(end_block) + + receipts_qs = JobFinishedReceipt.objects.filter( + timestamp__gte=start_ts, + timestamp__lt=end_ts, + ) + receipts: list[Receipt] = [] + async for receipt_data in receipts_qs: + receipts.append(receipt_data.to_receipt()) + return receipts + + async def _get_block_timestamp(self, block_number: int) -> datetime.datetime: + try: + block = await Block.objects.aget(block_number=block_number) + return block.creation_timestamp + except Exception as db_ex: + logger.debug( + "Block %s not found in DB or DB error occurred: %s", + block_number, + db_ex, + ) + + try: + ts = supertensor().get_block_timestamp(block_number) + if isinstance(ts, datetime.datetime): + return ts + else: + raise ValueError(f"Expected datetime, got {type(ts)}") + except Exception as chain_ex: # noqa: BLE001 - broad to surface upstream + logger.warning( + "Failed to resolve timestamp for block %s via chain: %s", + block_number, + chain_ex, + ) + raise diff --git a/validator/app/src/compute_horde_validator/validator/receipts/tests/test_receipts.py b/validator/app/src/compute_horde_validator/validator/receipts/tests/test_receipts.py new file mode 100644 index 000000000..e3b14291c --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/receipts/tests/test_receipts.py @@ -0,0 +1,426 @@ +import datetime +import uuid + +import bittensor_wallet +import pytest +from aiohttp import web +from asgiref.sync import sync_to_async +from compute_horde.receipts.models import JobAcceptedReceipt, JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, + JobFinishedReceiptPayload, + JobStartedReceiptPayload, + Receipt, +) +from compute_horde.utils import sign_blob +from django.utils import timezone + +from compute_horde_validator.validator.models import Miner +from compute_horde_validator.validator.models.allowance.internal import Block +from compute_horde_validator.validator.receipts import Receipts + + +@pytest.mark.asyncio +@pytest.mark.django_db(transaction=True) +async def test_transfer_receipts_from_miners_happy_path(settings): + settings.RECEIPT_TRANSFER_CHECKPOINT_CACHE = "default" + + miner_kp = bittensor_wallet.Keypair.create_from_mnemonic( + "almost fatigue race slim picnic mass better clog deal solve already champion" + ) + miner_hotkey = miner_kp.ss58_address + validator_kp = settings.BITTENSOR_WALLET().get_hotkey() + + started_payload = JobStartedReceiptPayload( + job_uuid=str(uuid.uuid4()), + miner_hotkey=miner_hotkey, + validator_hotkey=validator_kp.ss58_address, + timestamp=datetime.datetime.now(datetime.UTC), + executor_class="always_on.gpu-24gb", + is_organic=True, + ttl=300, + ) + started_blob = started_payload.blob_for_signing() + started_receipt = Receipt( + payload=started_payload, + validator_signature=sign_blob(validator_kp, started_blob), + miner_signature=sign_blob(miner_kp, started_blob), + ) + + accepted_payload = JobAcceptedReceiptPayload( + job_uuid=str(uuid.uuid4()), + miner_hotkey=miner_hotkey, + validator_hotkey=validator_kp.ss58_address, + timestamp=datetime.datetime.now(datetime.UTC), + time_accepted=datetime.datetime.now(datetime.UTC), + ttl=123, + ) + accepted_blob = accepted_payload.blob_for_signing() + accepted_receipt = Receipt( + payload=accepted_payload, + validator_signature=sign_blob(validator_kp, accepted_blob), + miner_signature=sign_blob(miner_kp, accepted_blob), + ) + + finished_payload = JobFinishedReceiptPayload( + job_uuid=str(uuid.uuid4()), + miner_hotkey=miner_hotkey, + validator_hotkey=validator_kp.ss58_address, + timestamp=datetime.datetime.now(datetime.UTC), + time_started=datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=5), + time_took_us=42, + score_str="0.5", + ) + finished_blob = finished_payload.blob_for_signing() + finished_receipt = Receipt( + payload=finished_payload, + validator_signature=sign_blob(validator_kp, finished_blob), + miner_signature=sign_blob(miner_kp, finished_blob), + ) + + jsonl_body = ( + started_receipt.model_dump_json() + + "\n" + + accepted_receipt.model_dump_json() + + "\n" + + finished_receipt.model_dump_json() + + "\n" + ) + + app = web.Application() + state = {"body": jsonl_body.encode("utf-8")} + + async def handler(request: web.Request): + rng = request.headers.get("Range") + if rng: + return web.Response(status=416) + return web.Response(status=200, body=state["body"], content_type="application/jsonl") + + app.router.add_get("/receipts/{page}.jsonl", handler) + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, "127.0.0.1", 0) + await site.start() + server = getattr(site, "_server", None) + assert server is not None and server.sockets, "Server failed to start" + port = server.sockets[0].getsockname()[1] + + try: + await sync_to_async(Miner.objects.create, thread_sensitive=True)( + hotkey=miner_hotkey, address="127.0.0.1", port=port + ) + + receipts_mgr = Receipts() + page = 123456 + result = await receipts_mgr._transfer_receipts_from_miners( + miner_hotkeys=[miner_hotkey], pages=[page], semaphore_limit=2, request_timeout=2.0 + ) + + assert result.n_receipts == 3 + assert result.n_successful_transfers == 1 + assert result.transfer_errors == [] + assert result.line_errors == [] + + stored_started = await sync_to_async( + lambda: JobStartedReceipt.objects.get(job_uuid=started_payload.job_uuid), + thread_sensitive=True, + )() + assert str(stored_started.job_uuid) == started_payload.job_uuid + assert stored_started.miner_hotkey == started_payload.miner_hotkey + assert stored_started.executor_class == "always_on.gpu-24gb" + assert stored_started.is_organic is True + assert stored_started.ttl == 300 + assert ( + isinstance(stored_started.validator_signature, str) + and stored_started.validator_signature + ) + assert isinstance(stored_started.miner_signature, str) and stored_started.miner_signature + + stored_accepted = await sync_to_async( + lambda: JobAcceptedReceipt.objects.get(job_uuid=accepted_payload.job_uuid), + thread_sensitive=True, + )() + assert str(stored_accepted.job_uuid) == accepted_payload.job_uuid + assert stored_accepted.miner_hotkey == accepted_payload.miner_hotkey + assert stored_accepted.ttl == 123 + assert ( + isinstance(stored_accepted.validator_signature, str) + and stored_accepted.validator_signature + ) + assert isinstance(stored_accepted.miner_signature, str) and stored_accepted.miner_signature + + stored_finished = await sync_to_async( + lambda: JobFinishedReceipt.objects.get(job_uuid=finished_payload.job_uuid), + thread_sensitive=True, + )() + assert str(stored_finished.job_uuid) == finished_payload.job_uuid + assert stored_finished.miner_hotkey == finished_payload.miner_hotkey + assert stored_finished.time_took_us == 42 + assert stored_finished.score_str == "0.5" + assert ( + isinstance(stored_finished.validator_signature, str) + and stored_finished.validator_signature + ) + assert isinstance(stored_finished.miner_signature, str) and stored_finished.miner_signature + + finally: + await runner.cleanup() + + +@pytest.mark.django_db(transaction=True) +def test_create_job_started_receipt_returns_payload_and_signature(settings): + receipts = Receipts() + + job_uuid = str(uuid.uuid4()) + miner_hotkey = "miner_hotkey_1" + validator_hotkey = settings.BITTENSOR_WALLET().get_hotkey().ss58_address + executor_class = "always_on.gpu-24gb" + is_organic = True + ttl = 300 + + payload, signature = receipts.create_job_started_receipt( + job_uuid=job_uuid, + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + executor_class=executor_class, + is_organic=is_organic, + ttl=ttl, + ) + + assert isinstance(signature, str) and len(signature) > 0 + assert payload.job_uuid == job_uuid + assert payload.miner_hotkey == miner_hotkey + assert payload.validator_hotkey == validator_hotkey + assert payload.executor_class == executor_class + assert payload.is_organic is is_organic + assert payload.ttl == ttl + assert payload.timestamp.tzinfo is datetime.UTC + + +@pytest.mark.django_db(transaction=True) +def test_create_job_finished_receipt_returns_expected_values(settings): + receipts = Receipts() + + job_uuid = str(uuid.uuid4()) + miner_hotkey = "miner_hotkey_2" + validator_hotkey = settings.BITTENSOR_WALLET().get_hotkey().ss58_address + time_started = datetime.datetime.now(datetime.UTC) - datetime.timedelta(seconds=5) + time_took_us = 1_234_567 + score_str = "0.987" + + finished = receipts.create_job_finished_receipt( + job_uuid=job_uuid, + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + time_started=time_started, + time_took_us=time_took_us, + score_str=score_str, + ) + + assert finished.job_uuid == job_uuid + assert finished.miner_hotkey == miner_hotkey + assert finished.validator_hotkey == validator_hotkey + assert finished.time_started == time_started + assert finished.time_took_us == time_took_us + assert finished.score_str == score_str + assert isinstance(finished.validator_signature, str) and len(finished.validator_signature) > 0 + assert ( + isinstance(finished.timestamp, datetime.datetime) + and finished.timestamp.tzinfo is datetime.UTC + ) + + +@pytest.mark.asyncio +@pytest.mark.django_db(transaction=True) +async def test_get_valid_job_started_receipts_for_miner_filters_correctly(settings): + miner_hotkey = "miner_hotkey_valid" + other_miner = "miner_hotkey_other" + validator_hotkey = settings.BITTENSOR_WALLET().get_hotkey().ss58_address + + base_ts = datetime.datetime.now(datetime.UTC) + + await JobStartedReceipt.objects.acreate( + job_uuid=str(uuid.uuid4()), + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + validator_signature="sig", + timestamp=base_ts - datetime.timedelta(seconds=10), + executor_class="always_on.gpu-24gb", + is_organic=True, + ttl=60, + ) + + await JobStartedReceipt.objects.acreate( + job_uuid=str(uuid.uuid4()), + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + validator_signature="sig", + timestamp=base_ts - datetime.timedelta(minutes=10), + executor_class="always_on.gpu-24gb", + is_organic=False, + ttl=30, + ) + + await JobStartedReceipt.objects.acreate( + job_uuid=str(uuid.uuid4()), + miner_hotkey=other_miner, + validator_hotkey=validator_hotkey, + validator_signature="sig", + timestamp=base_ts - datetime.timedelta(seconds=5), + executor_class="always_on.gpu-24gb", + is_organic=True, + ttl=60, + ) + + receipts = Receipts() + results = await receipts.get_valid_job_started_receipts_for_miner( + miner_hotkey=miner_hotkey, at_time=base_ts + ) + + assert len(results) == 1 + r = results[0] + assert r.miner_hotkey == miner_hotkey + assert r.validator_hotkey == validator_hotkey + assert r.executor_class == "always_on.gpu-24gb" + assert r.is_organic is True + assert r.ttl == 60 + + +@pytest.mark.asyncio +@pytest.mark.django_db(transaction=True) +async def test_get_job_finished_receipts_for_miner_filters_by_uuid(settings): + miner_hotkey = "miner_hotkey_finished" + validator_hotkey = settings.BITTENSOR_WALLET().get_hotkey().ss58_address + common_ts = timezone.now() + + wanted_uuid = str(uuid.uuid4()) + other_uuid = str(uuid.uuid4()) + + await JobFinishedReceipt.objects.acreate( + job_uuid=wanted_uuid, + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + validator_signature="sig", + timestamp=common_ts, + time_started=common_ts - datetime.timedelta(seconds=2), + time_took_us=42, + score_str="0.5", + ) + + await JobFinishedReceipt.objects.acreate( + job_uuid=other_uuid, + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + validator_signature="sig", + timestamp=common_ts, + time_started=common_ts - datetime.timedelta(seconds=3), + time_took_us=43, + score_str="0.6", + ) + + receipts = Receipts() + results = await receipts.get_job_finished_receipts_for_miner(miner_hotkey, [wanted_uuid]) + + assert len(results) == 1 + r = results[0] + assert str(r.job_uuid) == wanted_uuid + assert r.miner_hotkey == miner_hotkey + assert r.validator_hotkey == validator_hotkey + assert r.time_took_us == 42 + assert r.score_str == "0.5" + + +@pytest.mark.asyncio +@pytest.mark.django_db(transaction=True) +async def test_get_job_started_receipt_by_uuid_returns_instance_or_none(settings): + receipts = Receipts() + job_uuid_present = str(uuid.uuid4()) + job_uuid_missing = str(uuid.uuid4()) + + await JobStartedReceipt.objects.acreate( + job_uuid=job_uuid_present, + miner_hotkey="miner_xyz", + validator_hotkey=settings.BITTENSOR_WALLET().get_hotkey().ss58_address, + validator_signature="sig", + timestamp=timezone.now(), + executor_class="always_on.gpu-24gb", + is_organic=True, + ttl=60, + ) + + found = await receipts.get_job_started_receipt_by_uuid(job_uuid_present) + missing = await receipts.get_job_started_receipt_by_uuid(job_uuid_missing) + + assert found is not None + assert str(found.job_uuid) == job_uuid_present + assert missing is None + + +@pytest.mark.asyncio +@pytest.mark.django_db(transaction=True) +async def test_get_completed_job_receipts_for_block_range_returns_only_in_range(settings): + receipts = Receipts() + + # Setup block timestamps + start_block = 100 + end_block = 105 + start_ts = datetime.datetime.now(datetime.UTC) + end_ts = start_ts + datetime.timedelta(minutes=10) + + await sync_to_async(Block.objects.create, thread_sensitive=True)( + block_number=start_block, creation_timestamp=start_ts + ) + await sync_to_async(Block.objects.create, thread_sensitive=True)( + block_number=end_block, creation_timestamp=end_ts + ) + + miner_hotkey = "miner_hotkey_blockrange" + validator_hotkey = settings.BITTENSOR_WALLET().get_hotkey().ss58_address + + in_uuid = str(uuid.uuid4()) + await JobFinishedReceipt.objects.acreate( + job_uuid=in_uuid, + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + validator_signature="v_sig", + miner_signature="m_sig", + timestamp=start_ts + datetime.timedelta(minutes=5), + time_started=start_ts + datetime.timedelta(minutes=4), + time_took_us=1, + score_str="1.0", + ) + + await JobFinishedReceipt.objects.acreate( + job_uuid=str(uuid.uuid4()), + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + validator_signature="v_sig", + miner_signature="m_sig", + timestamp=start_ts - datetime.timedelta(seconds=1), + time_started=start_ts - datetime.timedelta(seconds=2), + time_took_us=2, + score_str="0.1", + ) + + await JobFinishedReceipt.objects.acreate( + job_uuid=str(uuid.uuid4()), + miner_hotkey=miner_hotkey, + validator_hotkey=validator_hotkey, + validator_signature="v_sig", + miner_signature="m_sig", + timestamp=end_ts, + time_started=end_ts - datetime.timedelta(seconds=2), + time_took_us=3, + score_str="0.2", + ) + + receipts_list = await receipts.get_completed_job_receipts_for_block_range( + start_block, end_block + ) + + assert len(receipts_list) == 1 + converted = receipts_list[0] + assert converted.payload.job_uuid == in_uuid + assert converted.payload.miner_hotkey == miner_hotkey + assert converted.payload.validator_hotkey == validator_hotkey + assert converted.payload.timestamp == start_ts + datetime.timedelta(minutes=5) diff --git a/validator/app/src/compute_horde_validator/validator/receipts/types.py b/validator/app/src/compute_horde_validator/validator/receipts/types.py index 8f9b8dd14..c2e94d09c 100644 --- a/validator/app/src/compute_horde_validator/validator/receipts/types.py +++ b/validator/app/src/compute_horde_validator/validator/receipts/types.py @@ -1,22 +1,4 @@ -class ReceiptsConfigurationError(Exception): - """Raised when there is a configuration error in the receipts module.""" - - pass - - -class ReceiptsScrapingError(Exception): - """Raised when there is an error scraping receipts from miners.""" - - pass - - class ReceiptsGenerationError(Exception): """Raised when there is an error generating receipts.""" pass - - -class ReceiptsValidationError(Exception): - """Raised when there is an error validating receipts.""" - - pass diff --git a/validator/app/src/compute_horde_validator/validator/routing/default.py b/validator/app/src/compute_horde_validator/validator/routing/default.py index cbfb3c11f..ede76661b 100644 --- a/validator/app/src/compute_horde_validator/validator/routing/default.py +++ b/validator/app/src/compute_horde_validator/validator/routing/default.py @@ -9,7 +9,6 @@ OrganicJobRequest, V2JobRequest, ) -from compute_horde.receipts.models import JobFinishedReceipt, JobStartedReceipt from compute_horde.subtensor import get_cycle_containing_block from compute_horde.utils import async_synchronized from django.conf import settings @@ -25,6 +24,7 @@ MinerManifest, MinerPreliminaryReservation, ) +from compute_horde_validator.validator.receipts.default import Receipts from compute_horde_validator.validator.routing.base import RoutingBase from compute_horde_validator.validator.routing.types import ( AllMinersBusy, @@ -218,20 +218,15 @@ async def _pick_miner_for_job_v2(request: V2JobRequest) -> JobRoute: .values_list("job_uuid", flat=True) } - known_started_jobs: set[str] = { - str(job_uuid) - async for job_uuid in JobStartedReceipt.objects.valid_at(timezone.now()) - .filter(miner_hotkey=miner.hotkey) - .values_list("job_uuid", flat=True) - } + started_receipts = await Receipts().get_valid_job_started_receipts_for_miner( + miner.hotkey, timezone.now() + ) + known_started_jobs: set[str] = {str(receipt.job_uuid) for receipt in started_receipts} - known_finished_jobs: set[str] = { - str(job_uuid) - async for job_uuid in JobFinishedReceipt.objects.filter( - job_uuid__in=known_started_jobs | preliminary_reservation_jobs, - miner_hotkey=miner.hotkey, - ).values_list("job_uuid", flat=True) - } + finished_receipts = await Receipts().get_job_finished_receipts_for_miner( + miner.hotkey, list(known_started_jobs | preliminary_reservation_jobs) + ) + known_finished_jobs: set[str] = {str(receipt.job_uuid) for receipt in finished_receipts} maybe_ongoing_jobs = ( preliminary_reservation_jobs | known_started_jobs diff --git a/validator/app/src/compute_horde_validator/validator/s3.py b/validator/app/src/compute_horde_validator/validator/s3.py index 47db09980..89b56d015 100644 --- a/validator/app/src/compute_horde_validator/validator/s3.py +++ b/validator/app/src/compute_horde_validator/validator/s3.py @@ -13,6 +13,17 @@ logger = logging.getLogger(__name__) +@contextlib.contextmanager +def s3_client_context(**kwargs): + client = get_s3_client(**kwargs) + try: + yield client + finally: + # Ensure the client is properly closed + if hasattr(client, '_endpoint'): + client._endpoint.http_session.close() + + def get_s3_client( aws_access_key_id=None, aws_secret_access_key=None, @@ -103,11 +114,12 @@ def download_prompts_from_s3_url(s3_url: str) -> list[str]: async def download_file_content(s3_url: str, client: httpx.AsyncClient | None = None) -> bytes: - if not client: - ctx = httpx.AsyncClient() + if client is None: + async with httpx.AsyncClient() as client: + response = await client.get(s3_url) + response.raise_for_status() + return response.content else: - ctx = contextlib.nullcontext(client) # type: ignore - async with ctx as client: response = await client.get(s3_url) response.raise_for_status() return response.content diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py index c1be4ff1f..f26b6a99d 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py @@ -83,6 +83,7 @@ SyntheticJobBatch, SystemEvent, ) +from compute_horde_validator.validator.receipts.default import Receipts from compute_horde_validator.validator.synthetic_jobs.generator import current from compute_horde_validator.validator.synthetic_jobs.generator.base import ( BaseSyntheticJobGenerator, @@ -926,16 +927,16 @@ def _generate_job_started_receipt(ctx: BatchContext, job: Job) -> None: ttl = job.get_spin_up_time() + job_timeout_seconds + spinup_leeway_seconds ttl_clamped = max(ttl_min, min(ttl_max, ttl)) - payload = JobStartedReceiptPayload( + receipts_service = Receipts() + payload, signature = receipts_service.create_job_started_receipt( job_uuid=job.uuid, miner_hotkey=job.miner_hotkey, validator_hotkey=ctx.own_keypair.ss58_address, - timestamp=datetime.now(tz=UTC), executor_class=ExecutorClass(job.executor_class), is_organic=False, ttl=ttl_clamped, ) - signature = f"0x{ctx.own_keypair.sign(payload.blob_for_signing()).hex()}" + job.job_started_receipt_payload = payload job.job_started_receipt_signature = signature diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_s3.py b/validator/app/src/compute_horde_validator/validator/tests/test_s3.py index 00b860d75..a0863484d 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_s3.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_s3.py @@ -1,14 +1,16 @@ -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest from moto import mock_aws from compute_horde_validator.validator.s3 import ( + download_file_content, download_prompts_from_s3_url, generate_download_url, generate_upload_url, get_public_url, get_s3_client, + s3_client_context, ) @@ -19,10 +21,15 @@ def bucket_name(): @pytest.fixture(autouse=True) def bucket(bucket_name: str): - with mock_aws(): - client = get_s3_client() - client.create_bucket(Bucket=bucket_name) + mock = mock_aws() + mock.start() + + try: + with s3_client_context() as client: + client.create_bucket(Bucket=bucket_name) yield + finally: + mock.stop() def test_generate_upload_url(bucket_name: str):