From ec74ed0edfd4692abb686bfb9f867d12e2dc42b8 Mon Sep 17 00:00:00 2001 From: Enam Mijbah Noor Date: Tue, 13 Jan 2026 18:59:33 +0600 Subject: [PATCH 1/3] feat: make organic job execution flow sync Impacts: validator --- .../src/compute_horde_validator/settings.py | 6 + .../validator/job_excuses.py | 27 + .../commands/debug_run_organic_job.py | 8 +- .../organic_jobs/miner_driver_sync.py | 1149 +++++++++++++++++ .../validator/tasks.py | 7 +- .../test_miner_driver_sync.py | 339 +++++ validator/uv.lock | 2 +- 7 files changed, 1535 insertions(+), 3 deletions(-) create mode 100644 validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver_sync.py create mode 100644 validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_miner_driver_sync.py diff --git a/validator/app/src/compute_horde_validator/settings.py b/validator/app/src/compute_horde_validator/settings.py index 978bacb88..8af73b8e7 100644 --- a/validator/app/src/compute_horde_validator/settings.py +++ b/validator/app/src/compute_horde_validator/settings.py @@ -134,6 +134,7 @@ def wrapped(*args, **kwargs): CONSTANCE_BACKEND = "constance.backends.database.DatabaseBackend" CONSTANCE_DATABASE_CACHE_BACKEND = "default" CONSTANCE_CONFIG = { + "SYNC_ORGANIC_JOBS": (False, "SYNC_ORGANIC_JOBS", bool), "SERVING": ( not env.bool("MIGRATING", default=False), "Whether this validator is serving jobs and setting weights", @@ -257,6 +258,11 @@ def wrapped(*args, **kwargs): "Maximum number of organic jobs each miner can get scores for. Negative value means unlimited.", int, ), + "DYNAMIC_MAX_OVERALL_ORGANIC_JOB_TIME_LIMIT": ( + 300, + "Overall time to complete an organic job", + int, + ), "DYNAMIC_EXECUTOR_RESERVATION_TIME_LIMIT": ( 7, "Time for miner to accept or decline an organic job in seconds", diff --git a/validator/app/src/compute_horde_validator/validator/job_excuses.py b/validator/app/src/compute_horde_validator/validator/job_excuses.py index 0716a608d..3fc8ca8e9 100644 --- a/validator/app/src/compute_horde_validator/validator/job_excuses.py +++ b/validator/app/src/compute_horde_validator/validator/job_excuses.py @@ -114,3 +114,30 @@ async def get_expected_miner_executor_count( return 0 return latest_manifest.online_executor_count + + +def get_expected_miner_executor_count_sync( + check_time: datetime, + miner_hotkey: str, + executor_class: ExecutorClass, +) -> int: + latest_manifest = ( + MinerManifest.objects.filter( + miner__hotkey=miner_hotkey, + executor_class=executor_class, + created_at__lte=check_time, + ) + .order_by("created_at") + .only("online_executor_count") + .first() + ) + + if latest_manifest is None: + logger.warning( + f"Cannot check expected miner executor count: " + f"manifest not found " + f"({miner_hotkey} {executor_class} {check_time})" + ) + return 0 + + return latest_manifest.online_executor_count diff --git a/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_organic_job.py b/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_organic_job.py index 7913c7f16..699e05b6f 100644 --- a/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_organic_job.py +++ b/validator/app/src/compute_horde_validator/validator/management/commands/debug_run_organic_job.py @@ -1,5 +1,6 @@ import shlex import sys +import time import uuid from asgiref.sync import async_to_sync @@ -125,6 +126,11 @@ def handle(self, *args, **options): streaming_start_time_limit=options["streaming_start_time_limit"], ) + if settings.DEBUG_USE_MOCK_BLOCK_NUMBER: + block = 5136476 + int((time.time() - 1742076533) / 12) + else: + block = allowance().get_current_block() + job = OrganicJob.objects.create( job_uuid=str(job_request.uuid), miner=miner, @@ -134,7 +140,7 @@ def handle(self, *args, **options): namespace=job_request.job_namespace or job_request.docker_image or None, executor_class=job_request.executor_class, job_description="User job from facilitator", - block=allowance().get_current_block(), + block=block, ) async def _run_job(): diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver_sync.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver_sync.py new file mode 100644 index 000000000..f91abb1e0 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver_sync.py @@ -0,0 +1,1149 @@ +import datetime +import logging +import random +import time +from collections.abc import Callable +from enum import Enum, auto +from typing import TypeVar, assert_never + +import bittensor +import bittensor_wallet +import sentry_sdk +import websockets +import websockets.sync.client +from asgiref.sync import async_to_sync +from channels.layers import get_channel_layer +from compute_horde.base.docker import DockerRunOptionsPreset +from compute_horde.executor_class import EXECUTOR_CLASS +from compute_horde.fv_protocol.facilitator_requests import OrganicJobRequest +from compute_horde.fv_protocol.validator_requests import ( + HordeFailureDetails, + JobFailureDetails, + JobRejectionDetails, + JobResultDetails, + JobStatusMetadata, + JobStatusUpdate, + StreamingServerDetails, +) +from compute_horde.job_errors import HordeError +from compute_horde.protocol_consts import ( + HordeFailureReason, + JobFailureReason, + JobParticipantType, + JobRejectionReason, + JobStatus, +) +from compute_horde.protocol_messages import ( + FailureContext, + GenericError, + MinerToValidatorMessage, + UnauthorizedError, + V0AcceptJobRequest, + V0DeclineJobRequest, + V0ExecutionDoneRequest, + V0ExecutorFailedRequest, + V0ExecutorManifestRequest, + V0ExecutorReadyRequest, + V0HordeFailedRequest, + V0InitialJobRequest, + V0JobAcceptedReceiptRequest, + V0JobFailedRequest, + V0JobFinishedReceiptRequest, + V0JobFinishedRequest, + V0JobRequest, + V0StreamingJobNotReadyRequest, + V0StreamingJobReadyRequest, + V0VolumesReadyRequest, + ValidatorAuthForMiner, + ValidatorToMinerMessage, +) +from compute_horde.receipts.models import JobAcceptedReceipt, JobFinishedReceipt, JobStartedReceipt +from compute_horde.receipts.schemas import ( + JobAcceptedReceiptPayload, + JobFinishedReceiptPayload, + JobStartedReceiptPayload, +) +from compute_horde.utils import Timer, sign_blob +from django.conf import settings +from pydantic import TypeAdapter + +from compute_horde_validator.validator import job_excuses +from compute_horde_validator.validator.allowance.default import allowance +from compute_horde_validator.validator.allowance.types import ValidatorModel +from compute_horde_validator.validator.allowance.utils.supertensor import supertensor +from compute_horde_validator.validator.dynamic_config import get_config +from compute_horde_validator.validator.models import ( + Miner, + OrganicJob, + SystemEvent, +) +from compute_horde_validator.validator.routing.default import routing +from compute_horde_validator.validator.routing.types import JobRoute, MinerIncidentType +from compute_horde_validator.validator.utils import TRUSTED_MINER_FAKE_KEY + +logger = logging.getLogger(__name__) + +_MinerMsgT = TypeVar("_MinerMsgT", bound=MinerToValidatorMessage) + + +def _get_current_block_sync() -> int: + if settings.DEBUG_USE_MOCK_BLOCK_NUMBER: + block = 5136476 + int((time.time() - 1742076533) / 12) + else: + block = allowance().get_current_block() + return block + + +def _get_active_validators(block: int | None) -> list[ValidatorModel]: + if settings.DEBUG_USE_MOCK_BLOCK_NUMBER: + return [] + + if block is None: + block = supertensor().get_current_block() + return supertensor().list_validators(block) + + +class MinerClient: + def __init__( + self, + url: str, + miner_hotkey: str, + validator_keypair: bittensor.Keypair, + connect_timeout: float = 10.0, + recv_timeout: float = 5.0, + max_retries: int = 5, + base_retry_delay: float = 1.0, + retry_jitter: float = 1.0, + ) -> None: + self.url = url + self.miner_hotkey = miner_hotkey + self.validator_keypair = validator_keypair + self.connect_timeout = connect_timeout + self.recv_timeout = recv_timeout + self.max_retries = max_retries + self.base_retry_delay = base_retry_delay + self.retry_jitter = retry_jitter + self.ws: websockets.sync.client.ClientConnection | None = None + + def _get_retry_delay(self, attempt: int) -> float: + return self.base_retry_delay * float(2**attempt) + random.uniform(0, self.retry_jitter) + + def connect(self) -> None: + for attempt in range(self.max_retries): + try: + self.ws = websockets.sync.client.connect( + self.url, + open_timeout=self.connect_timeout, + close_timeout=self.recv_timeout, + max_size=50 * (2**20), # 50MB + ) + # TODO: After async organic jobs is removed, move auth to connection headers + # instead of separate message + msg = ValidatorAuthForMiner( + validator_hotkey=self.validator_keypair.ss58_address, + miner_hotkey=self.miner_hotkey, + timestamp=int(time.time()), + signature="", + ) + msg.signature = sign_blob(self.validator_keypair, msg.blob_for_signing()) + self.ws.send(msg.model_dump_json()) + if attempt > 0: + logger.info( + "Connected to miner %s after %d retries", self.miner_hotkey, attempt + ) + return + except (websockets.WebSocketException, OSError): + self.close() + if attempt >= self.max_retries - 1: + raise + delay = self._get_retry_delay(attempt) + logger.info( + "Retrying connection to miner %s in %.2fs (attempt %d/%d)", + self.miner_hotkey, + delay, + attempt + 1, + self.max_retries, + ) + time.sleep(delay) + raise AssertionError("unreachable") + + def close(self) -> None: + try: + if self.ws: + self.ws.close() + finally: + self.ws = None + + def _reconnect(self) -> None: + self.close() + self.connect() + + def send(self, msg: ValidatorToMinerMessage) -> None: + assert self.ws is not None + raw = msg.model_dump_json() + logger.debug("Sending message to miner %s: %s", self.miner_hotkey, raw) + + for attempt in range(self.max_retries): + try: + self.ws.send(raw) + return + except (websockets.WebSocketException, OSError): + if attempt >= self.max_retries - 1: + raise + delay = self._get_retry_delay(attempt) + logger.info( + "Could not send msg to miner %s, reconnecting in %.2fs (attempt %d/%d)", + self.miner_hotkey, + delay, + attempt + 1, + self.max_retries, + ) + time.sleep(delay) + self._reconnect() + raise AssertionError("unreachable") + + def recv(self) -> MinerToValidatorMessage: + assert self.ws is not None + + for attempt in range(self.max_retries): + try: + raw = self.ws.recv(timeout=self.recv_timeout, decode=False) + logger.debug("Received message from miner %s: %s", self.miner_hotkey, raw) + return TypeAdapter(MinerToValidatorMessage).validate_json(raw) + except TimeoutError: + # the caller handles recv timeouts + raise + except (websockets.WebSocketException, OSError): + if attempt >= self.max_retries - 1: + raise + delay = self._get_retry_delay(attempt) + logger.info( + "Could not receive msg from miner %s, reconnecting in %.2fs (attempt %d/%d)", + self.miner_hotkey, + delay, + attempt + 1, + self.max_retries, + ) + time.sleep(delay) + self._reconnect() + raise AssertionError("unreachable") + + +class DriverState(Enum): + CONNECT = auto() + RESERVE_EXECUTOR = auto() + WAIT_JOB_ACCEPTED = auto() + SEND_JOB_ACCEPTED_RECEIPT = auto() + WAIT_EXECUTOR_READY = auto() + PREPARE_VOLUMES = auto() + WAIT_VOLUMES_READY = auto() + WAIT_STREAMING_JOB_READY = auto() + WAIT_EXECUTION_DONE = auto() + COLLECT_RESULTS = auto() + COMPLETE = auto() + FAILED = auto() + + +_job_event_subtype_map: dict[JobFailureReason, SystemEvent.EventSubType] = { + JobFailureReason.UNKNOWN: SystemEvent.EventSubType.GENERIC_ERROR, + JobFailureReason.TIMEOUT: SystemEvent.EventSubType.JOB_TIMEOUT, + JobFailureReason.NONZERO_RETURN_CODE: SystemEvent.EventSubType.JOB_PROCESS_NONZERO_EXIT_CODE, + JobFailureReason.DOWNLOAD_FAILED: SystemEvent.EventSubType.JOB_VOLUME_DOWNLOAD_FAILED, + JobFailureReason.UPLOAD_FAILED: SystemEvent.EventSubType.JOB_RESULT_UPLOAD_FAILED, +} + +_horde_event_subtype_map: dict[HordeFailureReason, SystemEvent.EventSubType] = { + HordeFailureReason.MINER_CONNECTION_FAILED: SystemEvent.EventSubType.MINER_CONNECTION_ERROR, + HordeFailureReason.INITIAL_RESPONSE_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, + HordeFailureReason.EXECUTOR_READINESS_RESPONSE_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, + HordeFailureReason.STREAMING_JOB_READY_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, + HordeFailureReason.VOLUMES_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, + HordeFailureReason.EXECUTION_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, + HordeFailureReason.FINAL_RESPONSE_TIMED_OUT: SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, + HordeFailureReason.SECURITY_CHECK_FAILED: SystemEvent.EventSubType.ERROR_FAILED_SECURITY_CHECK, + HordeFailureReason.STREAMING_FAILED: SystemEvent.EventSubType.GENERIC_ERROR, + HordeFailureReason.UNHANDLED_EXCEPTION: SystemEvent.EventSubType.GENERIC_ERROR, + HordeFailureReason.UNKNOWN: SystemEvent.EventSubType.GENERIC_ERROR, +} + + +class SyncOrganicJobDriver: + def __init__( + self, + miner_client: MinerClient, + job: OrganicJob, + request: OrganicJobRequest, + *, + miner_hotkey: str, + my_keypair: bittensor.Keypair, + allowed_leeway: int, + reservation_time_limit: int, + executor_startup_time_limit: int, + max_overall_time_limit: int, + status_callback: Callable[[JobStatusUpdate], None] | None = None, + ) -> None: + self.miner_client = miner_client + self.job = job + self.request = request + self.miner_hotkey = miner_hotkey + self.my_keypair = my_keypair + + self.allowed_leeway = allowed_leeway + self.reservation_time_limit = reservation_time_limit + self.executor_startup_time_limit = executor_startup_time_limit + + self.status_callback = status_callback or (lambda _status: None) + self._state = DriverState.CONNECT + self._started_at = time.time() + self._global_deadline = self._started_at + max_overall_time_limit + self._deadline: Timer | None = None + + def _record_event( + self, + type: SystemEvent.EventType, + subtype: SystemEvent.EventSubType, + long_description: str, + ) -> None: + if self.job.on_trusted_miner and get_config("DYNAMIC_DISABLE_TRUSTED_ORGANIC_JOB_EVENTS"): + return + + event_data = {"job_uuid": str(self.job.job_uuid), "miner_hotkey": self.miner_hotkey} + + SystemEvent.objects.using(settings.DEFAULT_DB_ALIAS).create( + type=type, + subtype=subtype, + long_description=long_description, + data=event_data, + ) + + def _report_incident(self, incident_type: MinerIncidentType) -> None: + routing().report_miner_incident( + incident_type, + hotkey_ss58address=self.job.miner.hotkey, + job_uuid=str(self.job.job_uuid), + executor_class=self.request.executor_class, + ) + + def _handle_horde_failure( + self, + comment: str, + event_subtype: SystemEvent.EventSubType, + horde_failure_reason: HordeFailureReason, + reported_by: JobParticipantType = JobParticipantType.VALIDATOR, + horde_failure_context: FailureContext | None = None, + incident_type: MinerIncidentType | None = None, + log_exc_info: bool = False, + ) -> DriverState: + logger.warning(comment, exc_info=log_exc_info) + self.job.status = OrganicJob.Status.FAILED + self.job.comment = comment + self.job.save(update_fields=["status", "comment"]) + + self._record_event( + type=SystemEvent.EventType.MINER_ORGANIC_JOB_FAILURE, + subtype=event_subtype, + long_description=comment, + ) + + if incident_type: + self._report_incident(incident_type) + + self.status_callback( + JobStatusUpdate( + uuid=str(self.job.job_uuid), + status=JobStatus.FAILED, + metadata=JobStatusMetadata( + horde_failure_details=HordeFailureDetails( + reported_by=reported_by, + reason=horde_failure_reason, + message=comment, + context=horde_failure_context, + ), + ), + ) + ) + return DriverState.FAILED + + def _handle_decline_job(self, msg: V0DeclineJobRequest) -> DriverState: + """Handle job decline/rejection from miner, including excuse validation.""" + if msg.reason != JobRejectionReason.BUSY: + comment = f"Miner rejected job: {msg.message}" + self.job.status = OrganicJob.Status.FAILED + system_event_subtype = SystemEvent.EventSubType.JOB_REJECTED + else: + job_started_receipt = JobStartedReceipt.objects.get(job_uuid=str(self.job.job_uuid)) + job_request_time = job_started_receipt.timestamp + active_validators = _get_active_validators(self.job.block) + valid_excuses = job_excuses.filter_valid_excuse_receipts( + receipts_to_check=msg.receipts or [], + check_time=job_request_time, + declined_job_uuid=str(self.job.job_uuid), + declined_job_executor_class=self.request.executor_class, + declined_job_is_synthetic=False, + minimum_validator_stake_for_excuse=get_config( + "DYNAMIC_MINIMUM_VALIDATOR_STAKE_FOR_EXCUSE" + ), + miner_hotkey=self.job.miner.hotkey, + active_validators=active_validators, + ) + expected_executor_count = job_excuses.get_expected_miner_executor_count_sync( + check_time=job_request_time, + miner_hotkey=self.job.miner.hotkey, + executor_class=self.request.executor_class, + ) + if len(valid_excuses) >= expected_executor_count: + comment = "Miner properly excused job" + self.job.status = OrganicJob.Status.EXCUSED + system_event_subtype = SystemEvent.EventSubType.JOB_EXCUSED + else: + comment = "Miner failed to excuse job" + self.job.status = OrganicJob.Status.FAILED + system_event_subtype = SystemEvent.EventSubType.JOB_REJECTED + + logger.info(comment) + self.job.comment = comment + self.job.save(update_fields=["status", "comment"]) + + if self.job.status != OrganicJob.Status.EXCUSED: + self._report_incident(MinerIncidentType.MINER_JOB_REJECTED) + + self.status_callback( + JobStatusUpdate( + uuid=str(self.job.job_uuid), + status=JobStatus.REJECTED, + metadata=JobStatusMetadata( + job_rejection_details=JobRejectionDetails( + rejected_by=JobParticipantType.MINER, + reason=msg.reason, + message=comment, + context=msg.context, + ), + ), + ) + ) + + self._record_event( + type=SystemEvent.EventType.MINER_ORGANIC_JOB_FAILURE, + subtype=system_event_subtype, + long_description=comment, + ) + return DriverState.FAILED + + def _handle_job_failed(self, msg: V0JobFailedRequest) -> DriverState: + """Handle V0JobFailedRequest from miner.""" + comment = msg.message or "Miner reported job failure" + logger.info("Miner reported job failure: %s", comment) + self.job.status = OrganicJob.Status.FAILED + self.job.comment = comment + self.job.save(update_fields=["status", "comment"]) + + self._report_incident(MinerIncidentType.MINER_JOB_FAILED) + subtype = _job_event_subtype_map.get( + msg.reason, SystemEvent.EventSubType.GENERIC_JOB_FAILURE + ) + self.status_callback( + JobStatusUpdate( + uuid=str(self.job.job_uuid), + status=JobStatus.FAILED, + metadata=JobStatusMetadata( + job_failure_details=JobFailureDetails( + reason=msg.reason, + stage=msg.stage, + message=msg.message, + context=msg.context, + docker_process_exit_status=msg.docker_process_exit_status, + docker_process_stdout=msg.docker_process_stdout, + docker_process_stderr=msg.docker_process_stderr, + ), + ), + ) + ) + + self._record_event( + type=SystemEvent.EventType.MINER_ORGANIC_JOB_FAILURE, + subtype=subtype, + long_description=comment, + ) + return DriverState.FAILED + + def _handle_miner_horde_failed_message(self, msg: V0HordeFailedRequest) -> DriverState: + """Handle V0HordeFailedRequest from miner.""" + comment = msg.message or "Miner reported horde failure" + logger.info("Miner reported horde failure: %s", comment) + self.job.status = OrganicJob.Status.FAILED + self.job.comment = comment + self.job.save(update_fields=["status", "comment"]) + + self._report_incident(MinerIncidentType.MINER_HORDE_FAILED) + subtype = _horde_event_subtype_map.get(msg.reason, SystemEvent.EventSubType.GENERIC_ERROR) + self.status_callback( + JobStatusUpdate( + uuid=str(self.job.job_uuid), + status=JobStatus.HORDE_FAILED, + metadata=JobStatusMetadata( + horde_failure_details=HordeFailureDetails( + reported_by=msg.reported_by, + reason=msg.reason, + message=msg.message, + context=msg.context, + ), + ), + ) + ) + + self._record_event( + type=SystemEvent.EventType.MINER_ORGANIC_JOB_FAILURE, + subtype=subtype, + long_description=comment, + ) + return DriverState.FAILED + + def _wait_for_message( + self, + expected_message_type: type[_MinerMsgT], + timeout: float, + timeout_status_reason: HordeFailureReason, + ) -> _MinerMsgT | DriverState: + """ + Waits for a specific message type from the miner within a given timeout period. + Returns the received message if successful. + Returns a terminal DriverState if the timeout is reached or if an error message is received. + """ + deadline = time.time() + timeout + while True: + if time.time() >= min(deadline, self._global_deadline): + # NOTE: No incident reported - timeouts are not miner incidents, + # consistent with the async implementation. + return self._handle_horde_failure( + comment=f"Timed out waiting for miner (reason={timeout_status_reason})", + event_subtype=SystemEvent.EventSubType.ERROR_VALIDATOR_REPORTED_TIMEOUT, + horde_failure_reason=timeout_status_reason, + ) + + try: + msg = self.miner_client.recv() + except TimeoutError: + # Individual recv timeouts are not considered errors. Timeouts are handled above. + continue + + if isinstance(msg, V0ExecutorManifestRequest): + # NOTE: they are not used in organic jobs + continue + + # Validate that the message is for the correct job + received_job_uuid = getattr(msg, "job_uuid", str(self.job.job_uuid)) + if received_job_uuid != str(self.job.job_uuid): + logger.warning( + "Received msg for a different job (expected job %s, got job %s): %s", + self.job.job_uuid, + received_job_uuid, + msg, + ) + continue + + if isinstance(msg, expected_message_type): + return msg + + # Handle legacy messages by mapping them to V0HordeFailedRequest + if isinstance(msg, V0StreamingJobNotReadyRequest): + msg = V0HordeFailedRequest( + job_uuid=msg.job_uuid, + reported_by=JobParticipantType.MINER, + reason=HordeFailureReason.STREAMING_FAILED, + message="Executor reported legacy V0StreamingJobNotReadyRequest message", + ) + elif isinstance(msg, V0ExecutorFailedRequest): + msg = V0HordeFailedRequest( + job_uuid=msg.job_uuid, + reported_by=JobParticipantType.MINER, + reason=HordeFailureReason.GENERIC_ERROR, + message="Executor reported legacy V0ExecutorFailedRequest message", + ) + + if isinstance(msg, GenericError): + closing_phrases = ["Unknown validator", "Inactive validator", "not authenticated"] + # TODO: move these messages from GenericError to UnauthorizedError in miner + for close_phrase in closing_phrases: + if msg.details and close_phrase in msg.details: + return self._handle_horde_failure( + comment=f"Miner closed connection: {msg.details}", + event_subtype=SystemEvent.EventSubType.UNAUTHORIZED, + horde_failure_reason=HordeFailureReason.MINER_CONNECTION_FAILED, + reported_by=JobParticipantType.MINER, + incident_type=MinerIncidentType.MINER_JOB_REJECTED, + ) + else: + self._record_event( + type=SystemEvent.EventType.MINER_ORGANIC_JOB_FAILURE, + subtype=SystemEvent.EventSubType.GENERIC_ERROR, + long_description=f"Received error message from miner {self.miner_hotkey}: {msg.model_dump_json()}", + ) + # NOTE: To align with the old behavior, we do not mark the job as failed here. + # In theory, miner can still continue after sending a GenericError. + continue + + if isinstance(msg, UnauthorizedError): + return self._handle_horde_failure( + comment=f"Miner returned unauthorized error: {msg.code} {msg.details}", + event_subtype=SystemEvent.EventSubType.UNAUTHORIZED, + horde_failure_reason=HordeFailureReason.MINER_CONNECTION_FAILED, + reported_by=JobParticipantType.MINER, + ) + + if isinstance(msg, V0DeclineJobRequest): + return self._handle_decline_job(msg) + + if isinstance(msg, V0JobFailedRequest): + return self._handle_job_failed(msg) + + if isinstance(msg, V0HordeFailedRequest): + return self._handle_miner_horde_failed_message(msg) + + logger.info( + "Ignoring %s while waiting for %s for job %s", + msg.__class__.__name__, + expected_message_type.__name__, + self.job.job_uuid, + ) + + def run(self) -> None: + try: + self._run() + except Exception as exc: + sentry_sdk.capture_exception(exc) + error = HordeError.wrap_unhandled(exc) + event_subtype = _horde_event_subtype_map.get( + error.reason, SystemEvent.EventSubType.GENERIC_ERROR + ) + self._state = self._handle_horde_failure( + comment=str(error), + event_subtype=event_subtype, + horde_failure_reason=error.reason, + horde_failure_context=error.context, + ) + finally: + self._send_job_finished_receipt_if_failed() + self._undo_allowance_if_failed() + self.miner_client.close() + + def _send_job_finished_receipt_if_failed(self) -> None: + if self._state == DriverState.COMPLETE: + return + + if self.miner_client.ws is None: + return + + try: + started_timestamp = datetime.datetime.fromtimestamp(self._started_at, datetime.UTC) + receipt_payload = JobFinishedReceiptPayload( + job_uuid=str(self.job.job_uuid), + miner_hotkey=self.miner_hotkey, + validator_hotkey=self.my_keypair.ss58_address, + timestamp=datetime.datetime.now(datetime.UTC), + time_started=started_timestamp, + time_took_us=int((time.time() - self._started_at) * 1_000_000), + block_numbers=[], + score_str="0.000000", + ) + receipt_signature = sign_blob(self.my_keypair, receipt_payload.blob_for_signing()) + self.miner_client.send( + V0JobFinishedReceiptRequest(payload=receipt_payload, signature=receipt_signature) + ) + JobFinishedReceipt.from_payload( + receipt_payload, + validator_signature=receipt_signature, + ).save() + except Exception as exc: + logger.warning( + "Failed to send/persist JobFinishedReceipt for failed job %s: %s", + self.job.job_uuid, + exc, + exc_info=True, + ) + + def _undo_allowance_if_failed(self) -> None: + if self._state == DriverState.COMPLETE: + return + + if self.job.allowance_reservation_id is None: + return + + try: + allowance().undo_allowance_reservation(self.job.allowance_reservation_id) + logger.info( + "Successfully undid allowance reservation %s for failed job %s", + self.job.allowance_reservation_id, + self.job.job_uuid, + ) + except Exception as exc: + logger.error( + "Failed to undo allowance reservation %s for failed job %s: %s", + self.job.allowance_reservation_id, + self.job.job_uuid, + exc, + exc_info=True, + ) + + def _run(self) -> None: + while True: + match self._state: + case DriverState.CONNECT: + self._state = self._connect() + case DriverState.RESERVE_EXECUTOR: + self._state = self._reserve_executor() + case DriverState.WAIT_JOB_ACCEPTED: + self._state = self._wait_job_accepted() + case DriverState.SEND_JOB_ACCEPTED_RECEIPT: + self._state = self._send_job_accepted_receipt() + case DriverState.WAIT_EXECUTOR_READY: + self._state = self._wait_executor_ready() + case DriverState.PREPARE_VOLUMES: + self._state = self._prepare_volumes() + case DriverState.WAIT_VOLUMES_READY: + self._state = self._wait_volumes_ready() + case DriverState.WAIT_STREAMING_JOB_READY: + self._state = self._wait_streaming_job_ready() + case DriverState.WAIT_EXECUTION_DONE: + self._state = self._wait_execution_done() + case DriverState.COLLECT_RESULTS: + self._state = self._collect_results() + case DriverState.COMPLETE: + return + case DriverState.FAILED: + return + case _: + assert_never(self._state) + + def _connect(self) -> DriverState: + assert self._state == DriverState.CONNECT + + try: + self.miner_client.connect() + except Exception as exc: + return self._handle_horde_failure( + comment=f"Miner connection failed: {exc}", + event_subtype=SystemEvent.EventSubType.MINER_CONNECTION_ERROR, + horde_failure_reason=HordeFailureReason.MINER_CONNECTION_FAILED, + log_exc_info=True, + ) + + return DriverState.RESERVE_EXECUTOR + + def _reserve_executor(self) -> DriverState: + assert self._state == DriverState.RESERVE_EXECUTOR + + receipt_payload = JobStartedReceiptPayload( + job_uuid=str(self.job.job_uuid), + miner_hotkey=self.miner_hotkey, + validator_hotkey=self.my_keypair.ss58_address, + timestamp=datetime.datetime.now(datetime.UTC), + executor_class=self.request.executor_class, + is_organic=True, + ttl=self.reservation_time_limit, + ) + receipt_signature = sign_blob(self.my_keypair, receipt_payload.blob_for_signing()) + + executor_timing = V0InitialJobRequest.ExecutorTimingDetails( + allowed_leeway=self.allowed_leeway, + download_time_limit=int(self.request.download_time_limit), + execution_time_limit=int(self.request.execution_time_limit), + upload_time_limit=int(self.request.upload_time_limit), + streaming_start_time_limit=int(self.request.streaming_start_time_limit), + ) + + msg = V0InitialJobRequest( + job_uuid=str(self.job.job_uuid), + executor_class=self.request.executor_class, + docker_image=self.request.docker_image, + timeout_seconds=None, + volume=self.request.volume, + job_started_receipt_payload=receipt_payload, + job_started_receipt_signature=receipt_signature, + streaming_details=self.request.streaming_details, + executor_timing=executor_timing, + ) + + try: + self.miner_client.send(msg) + except Exception as exc: + return self._handle_horde_failure( + comment=f"Failed to send initial job request: {exc}", + event_subtype=SystemEvent.EventSubType.MINER_SEND_ERROR, + horde_failure_reason=HordeFailureReason.MINER_CONNECTION_FAILED, + log_exc_info=True, + ) + + try: + JobStartedReceipt.from_payload( + receipt_payload, + validator_signature=receipt_signature, + ).save() + except Exception as exc: + # Receipt persistence failure should not fail the job. + logger.warning( + "Failed to persist JobStartedReceipt for job %s: %s", + self.job.job_uuid, + exc, + exc_info=True, + ) + + return DriverState.WAIT_JOB_ACCEPTED + + def _wait_job_accepted(self) -> DriverState: + assert self._state == DriverState.WAIT_JOB_ACCEPTED + + result = self._wait_for_message( + V0AcceptJobRequest, + timeout=self.reservation_time_limit, + timeout_status_reason=HordeFailureReason.INITIAL_RESPONSE_TIMED_OUT, + ) + if isinstance(result, DriverState): + return result + + self.status_callback( + JobStatusUpdate(uuid=str(self.job.job_uuid), status=JobStatus.ACCEPTED) + ) + + return DriverState.SEND_JOB_ACCEPTED_RECEIPT + + def _send_job_accepted_receipt(self) -> DriverState: + assert self._state == DriverState.SEND_JOB_ACCEPTED_RECEIPT + + executor_spinup_time_limit = EXECUTOR_CLASS[self.request.executor_class].spin_up_time + readiness_time_limit = executor_spinup_time_limit + self.executor_startup_time_limit + job_accepted_receipt_ttl = ( + readiness_time_limit + + self.allowed_leeway + + self.request.download_time_limit + + self.request.execution_time_limit + + self.request.upload_time_limit + + self.request.streaming_start_time_limit + ) + receipt_payload = JobAcceptedReceiptPayload( + job_uuid=str(self.job.job_uuid), + miner_hotkey=self.miner_hotkey, + validator_hotkey=self.my_keypair.ss58_address, + timestamp=datetime.datetime.now(datetime.UTC), + time_accepted=datetime.datetime.now(datetime.UTC), + ttl=job_accepted_receipt_ttl, + ) + receipt_signature = sign_blob(self.my_keypair, receipt_payload.blob_for_signing()) + receipt_msg = V0JobAcceptedReceiptRequest( + payload=receipt_payload, signature=receipt_signature + ) + + try: + self.miner_client.send(receipt_msg) + JobAcceptedReceipt.from_payload( + receipt_payload, + validator_signature=receipt_signature, + ).save() + except Exception as exc: + comment = ( + f"Failed to send/persist JobAcceptedReceipt for job {self.job.job_uuid}: {exc}" + ) + logger.warning(comment, exc_info=True) + self._record_event( + type=SystemEvent.EventType.RECEIPT_FAILURE, + subtype=SystemEvent.EventSubType.RECEIPT_SEND_ERROR, + long_description=comment, + ) + # Receipt failure does NOT fail the job in async version, but here it might. + # Let's stay consistent with async: it doesn't fail the job. + + return DriverState.WAIT_EXECUTOR_READY + + def _wait_executor_ready(self) -> DriverState: + assert self._state == DriverState.WAIT_EXECUTOR_READY + + executor_spinup_time_limit = EXECUTOR_CLASS[self.request.executor_class].spin_up_time + readiness_time_limit = executor_spinup_time_limit + self.executor_startup_time_limit + + result = self._wait_for_message( + V0ExecutorReadyRequest, + timeout=readiness_time_limit, + timeout_status_reason=HordeFailureReason.EXECUTOR_READINESS_RESPONSE_TIMED_OUT, + ) + if isinstance(result, DriverState): + return result + + # Initialize the deadline timer with allowed_leeway. + # Subsequent stages will extend this deadline. + self._deadline = Timer(self.allowed_leeway) + + self.status_callback( + JobStatusUpdate( + uuid=str(self.job.job_uuid), + status=JobStatus.EXECUTOR_READY, + ) + ) + return DriverState.PREPARE_VOLUMES + + def _prepare_volumes(self) -> DriverState: + assert self._state == DriverState.PREPARE_VOLUMES + + has_gpu = EXECUTOR_CLASS[self.request.executor_class].has_gpu + docker_run_options_preset: DockerRunOptionsPreset = "nvidia_all" if has_gpu else "none" + + msg = V0JobRequest( + job_uuid=str(self.job.job_uuid), + executor_class=self.request.executor_class, + docker_image=self.request.docker_image, + docker_run_options_preset=docker_run_options_preset, + docker_run_cmd=self.request.get_args(), + env=self.request.env, + volume=None, # already sent in V0InitialJobRequest + output_upload=self.request.output_upload, + artifacts_dir=self.request.artifacts_dir, + ) + + try: + self.miner_client.send(msg) + except Exception as exc: + return self._handle_horde_failure( + comment=f"Failed to send job request: {exc}", + event_subtype=SystemEvent.EventSubType.MINER_SEND_ERROR, + horde_failure_reason=HordeFailureReason.MINER_CONNECTION_FAILED, + log_exc_info=True, + ) + + return DriverState.WAIT_VOLUMES_READY + + def _wait_volumes_ready(self) -> DriverState: + assert self._state == DriverState.WAIT_VOLUMES_READY + assert self._deadline is not None + + self._deadline.extend_timeout(self.request.download_time_limit) + + result = self._wait_for_message( + V0VolumesReadyRequest, + timeout=self._deadline.time_left(), + timeout_status_reason=HordeFailureReason.VOLUMES_TIMED_OUT, + ) + if isinstance(result, DriverState): + return result + + self.status_callback( + JobStatusUpdate(uuid=str(self.job.job_uuid), status=JobStatus.VOLUMES_READY) + ) + + if self.request.streaming_details: + return DriverState.WAIT_STREAMING_JOB_READY + else: + return DriverState.WAIT_EXECUTION_DONE + + def _wait_streaming_job_ready(self) -> DriverState: + assert self._state == DriverState.WAIT_STREAMING_JOB_READY + assert self._deadline is not None + + self._deadline.extend_timeout(self.request.streaming_start_time_limit) + + result = self._wait_for_message( + V0StreamingJobReadyRequest, + timeout=self._deadline.time_left(), + timeout_status_reason=HordeFailureReason.STREAMING_JOB_READY_TIMED_OUT, + ) + if isinstance(result, DriverState): + return result + + self.status_callback( + JobStatusUpdate( + uuid=str(self.job.job_uuid), + status=JobStatus.STREAMING_READY, + metadata=JobStatusMetadata( + streaming_details=StreamingServerDetails( + streaming_server_cert=result.public_key, + streaming_server_address=result.ip, + streaming_server_port=result.port, + ) + ), + ) + ) + + return DriverState.WAIT_EXECUTION_DONE + + def _wait_execution_done(self) -> DriverState: + assert self._state == DriverState.WAIT_EXECUTION_DONE + assert self._deadline is not None + + self._deadline.extend_timeout(self.request.execution_time_limit) + + result = self._wait_for_message( + V0ExecutionDoneRequest, + timeout=self._deadline.time_left(), + timeout_status_reason=HordeFailureReason.EXECUTION_TIMED_OUT, + ) + if isinstance(result, DriverState): + return result + + self.status_callback( + JobStatusUpdate(uuid=str(self.job.job_uuid), status=JobStatus.EXECUTION_DONE) + ) + return DriverState.COLLECT_RESULTS + + def _collect_results(self) -> DriverState: + assert self._state == DriverState.COLLECT_RESULTS + assert self._deadline is not None + + self._deadline.extend_timeout(self.request.upload_time_limit) + + result = self._wait_for_message( + V0JobFinishedRequest, + timeout=self._deadline.time_left(), + timeout_status_reason=HordeFailureReason.FINAL_RESPONSE_TIMED_OUT, + ) + if isinstance(result, DriverState): + return result + + if self.job.allowance_reservation_id is not None: + try: + allowance().spend_allowance(self.job.allowance_reservation_id) + logger.info( + "Successfully spent allowance for reservation %s for job %s", + self.job.allowance_reservation_id, + self.job.job_uuid, + ) + except Exception as exc: + logger.error( + "Failed to spend allowance for reservation %s for job %s: %s", + self.job.allowance_reservation_id, + self.job.job_uuid, + exc, + exc_info=True, + ) + + self.job.stdout = result.docker_process_stdout + self.job.stderr = result.docker_process_stderr + self.job.artifacts = result.artifacts or {} + self.job.upload_results = result.upload_results or {} + self.job.status = OrganicJob.Status.COMPLETED + comment = f"Miner job finished: stdout={len(self.job.stdout)} bytes stderr={len(self.job.stderr)} bytes" + self.job.comment = comment + self.job.save( + update_fields=["stdout", "stderr", "artifacts", "upload_results", "status", "comment"] + ) + + self._record_event( + type=SystemEvent.EventType.MINER_ORGANIC_JOB_SUCCESS, + subtype=SystemEvent.EventSubType.SUCCESS, + long_description=comment, + ) + + try: + started_timestamp = datetime.datetime.fromtimestamp(self._started_at, datetime.UTC) + receipt_payload = JobFinishedReceiptPayload( + job_uuid=str(self.job.job_uuid), + miner_hotkey=self.miner_hotkey, + validator_hotkey=self.my_keypair.ss58_address, + timestamp=datetime.datetime.now(datetime.UTC), + time_started=started_timestamp, + time_took_us=int((time.time() - self._started_at) * 1_000_000), + block_numbers=self.job.allowance_blocks or [], + score_str=f"{float(self.job.allowance_job_value or 0):.6f}", + ) + receipt_signature = sign_blob(self.my_keypair, receipt_payload.blob_for_signing()) + self.miner_client.send( + V0JobFinishedReceiptRequest(payload=receipt_payload, signature=receipt_signature) + ) + JobFinishedReceipt.from_payload( + receipt_payload, + validator_signature=receipt_signature, + ).save() + except Exception as exc: + logger.warning( + "Failed to send/persist JobFinishedReceipt for job %s: %s", + self.job.job_uuid, + exc, + exc_info=True, + ) + + self.status_callback( + JobStatusUpdate( + uuid=str(self.job.job_uuid), + status=JobStatus.COMPLETED, + metadata=JobStatusMetadata( + miner_response=JobResultDetails( + docker_process_stdout=self.job.stdout, + docker_process_stderr=self.job.stderr, + artifacts=self.job.artifacts, + upload_results=self.job.upload_results, + ) + ), + ) + ) + + return DriverState.COMPLETE + + +def execute_organic_job_request_sync( + job_request: OrganicJobRequest, job_route: JobRoute +) -> OrganicJob: + if ( + job_route.miner.hotkey_ss58 == settings.DEBUG_MINER_KEY + and settings.DEBUG_MINER_ADDRESS + and settings.DEBUG_MINER_PORT + ): + miner_ip = settings.DEBUG_MINER_ADDRESS + miner_port = settings.DEBUG_MINER_PORT + ip_type = 4 + on_trusted_miner = False + elif job_route.miner.hotkey_ss58 == TRUSTED_MINER_FAKE_KEY: + miner_ip = settings.TRUSTED_MINER_ADDRESS + miner_port = settings.TRUSTED_MINER_PORT + ip_type = 4 + on_trusted_miner = True + else: + miner_ip = job_route.miner.address + miner_port = job_route.miner.port + ip_type = job_route.miner.ip_version + on_trusted_miner = False + + miner = Miner.objects.get(hotkey=job_route.miner.hotkey_ss58) + job = OrganicJob.objects.create( + job_uuid=str(job_request.uuid), + miner=miner, + miner_address=miner_ip, + miner_address_ip_version=ip_type, + miner_port=miner_port, + namespace=job_request.job_namespace or job_request.docker_image or None, + executor_class=job_request.executor_class, + job_description="User job from facilitator", + block=_get_current_block_sync(), + on_trusted_miner=on_trusted_miner, + streaming_details=job_request.streaming_details.model_dump() + if job_request.streaming_details + else None, + allowance_blocks=job_route.allowance_blocks, + allowance_reservation_id=job_route.allowance_reservation_id, + allowance_job_value=job_route.allowance_job_value or 0, + ) + + my_wallet: bittensor_wallet.Wallet = settings.BITTENSOR_WALLET() + my_keypair = my_wallet.hotkey + ws_url = f"ws://{miner_ip}:{miner_port}/v0.1/validator_interface/{my_keypair.ss58_address}" + client = MinerClient( + url=ws_url, miner_hotkey=job_route.miner.hotkey_ss58, validator_keypair=my_keypair + ) + + def status_callback(status_update: JobStatusUpdate) -> None: + # TODO: use a db table to "transfer" the status updates + channel_layer = get_channel_layer() + async_to_sync(channel_layer.send)( + f"job_status_updates__{status_update.uuid}", + {"type": "job_status_update", "payload": status_update.model_dump(mode="json")}, + ) + + driver = SyncOrganicJobDriver( + client, + job, + job_request, + miner_hotkey=job_route.miner.hotkey_ss58, + my_keypair=my_keypair, + allowed_leeway=get_config("DYNAMIC_ORGANIC_JOB_ALLOWED_LEEWAY_TIME"), + reservation_time_limit=get_config("DYNAMIC_EXECUTOR_RESERVATION_TIME_LIMIT"), + executor_startup_time_limit=get_config("DYNAMIC_EXECUTOR_STARTUP_TIME_LIMIT"), + max_overall_time_limit=get_config("DYNAMIC_MAX_OVERALL_ORGANIC_JOB_TIME_LIMIT"), + status_callback=status_callback, + ) + driver.run() + return job diff --git a/validator/app/src/compute_horde_validator/validator/tasks.py b/validator/app/src/compute_horde_validator/validator/tasks.py index 1f6cb0203..47cd3e3c4 100644 --- a/validator/app/src/compute_horde_validator/validator/tasks.py +++ b/validator/app/src/compute_horde_validator/validator/tasks.py @@ -49,6 +49,7 @@ ) from .dynamic_config import aget_config from .models import MinerManifest +from .organic_jobs.miner_driver_sync import execute_organic_job_request_sync from .scoring import tasks as scoring_tasks # noqa if False: @@ -276,7 +277,11 @@ async def execute_organic_job_request_on_worker( def _execute_organic_job_on_worker(job_request: JsonValue, job_route: JsonValue) -> None: request: OrganicJobRequest = TypeAdapter(OrganicJobRequest).validate_python(job_request) route: JobRoute = TypeAdapter(JobRoute).validate_python(job_route) - async_to_sync(execute_organic_job_request)(request, route) + + if config.SYNC_ORGANIC_JOBS: + execute_organic_job_request_sync(request, route) + else: + async_to_sync(execute_organic_job_request)(request, route) @app.task(bind=True, max_retries=SLASH_COLLATERAL_TASK_MAX_RETRIES) diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_miner_driver_sync.py b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_miner_driver_sync.py new file mode 100644 index 000000000..b6b0755ac --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/tests/test_organic_jobs/test_miner_driver_sync.py @@ -0,0 +1,339 @@ +import uuid +from collections.abc import Callable +from functools import partial +from unittest import mock + +import pytest +from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS +from compute_horde.fv_protocol.facilitator_requests import OrganicJobRequest +from compute_horde.fv_protocol.validator_requests import JobStatusUpdate +from compute_horde.protocol_consts import JobStatus +from compute_horde.protocol_messages import ( + MinerToValidatorMessage, + V0AcceptJobRequest, + V0DeclineJobRequest, + V0ExecutionDoneRequest, + V0ExecutorFailedRequest, + V0ExecutorReadyRequest, + V0JobAcceptedReceiptRequest, + V0JobFailedRequest, + V0JobFinishedReceiptRequest, + V0JobFinishedRequest, + V0VolumesReadyRequest, + ValidatorToMinerMessage, +) + +from compute_horde_validator.validator.models import Miner +from compute_horde_validator.validator.organic_jobs.facilitator_client import OrganicJob +from compute_horde_validator.validator.organic_jobs.miner_driver_sync import ( + MinerClient, + SyncOrganicJobDriver, +) + +from ..helpers import get_dummy_job_request_v2 + + +class MockSyncMinerClient(MinerClient): + """Mock sync MinerClient that returns pre-configured messages.""" + + def __init__( + self, + *args, + messages_to_return: list[MinerToValidatorMessage], + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self._messages_to_return = list(messages_to_return) + self._sent_models: list[ValidatorToMinerMessage] = [] + self._recv_index = 0 + + def connect(self) -> None: + # Mock connection - set ws to a truthy value so send works + self.ws = mock.MagicMock() # type: ignore[assignment] + + def close(self) -> None: + # Mock close - do nothing + pass + + def send(self, msg: ValidatorToMinerMessage) -> None: + self._sent_models.append(msg) + + def recv(self) -> MinerToValidatorMessage: + if self._recv_index >= len(self._messages_to_return): + raise TimeoutError("No more messages to return") + msg = self._messages_to_return[self._recv_index] + self._recv_index += 1 + return msg + + def query_sent_models( + self, + condition: Callable[[ValidatorToMinerMessage], bool] | None = None, + model_class: type | None = None, + ) -> list[ValidatorToMinerMessage]: + result = [] + for model in self._sent_models: + if model_class is not None and not isinstance(model, model_class): + continue + if condition is not None and not condition(model): + continue + result.append(model) + return result + + +def create_job_request(job_uuid: str) -> OrganicJobRequest: + """Convert V2JobRequest to OrganicJobRequest for the sync driver.""" + v2_request = get_dummy_job_request_v2(job_uuid) + return OrganicJobRequest( + type="job.new", + uuid=job_uuid, + docker_image=v2_request.docker_image, + executor_class=v2_request.executor_class, + args=v2_request.args, + env=v2_request.env, + volume=v2_request.volume, + output_upload=v2_request.output_upload, + download_time_limit=v2_request.download_time_limit, + execution_time_limit=v2_request.execution_time_limit, + streaming_start_time_limit=v2_request.streaming_start_time_limit, + upload_time_limit=v2_request.upload_time_limit, + ) + + +@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) +@pytest.mark.parametrize( + ( + "messages_from_miner", + "expected_job_status_updates", + "organic_job_status", + "expected_job_accepted_receipt", + "expected_job_finished_receipt", + ), + [ + # Timeout - no messages from miner + ( + [], + [JobStatus.FAILED], + OrganicJob.Status.FAILED, + False, + True, # Failure receipt is sent + ), + # Miner declines job + ( + [V0DeclineJobRequest], + [JobStatus.REJECTED], + OrganicJob.Status.FAILED, + False, + True, # Failure receipt is sent + ), + # Miner accepts but executor fails + ( + [ + V0AcceptJobRequest, + V0ExecutorFailedRequest, + ], + [JobStatus.ACCEPTED, JobStatus.HORDE_FAILED], + OrganicJob.Status.FAILED, + True, + True, # Failure receipt is sent + ), + # Miner accepts, executor ready, then timeout + ( + [ + V0AcceptJobRequest, + V0ExecutorReadyRequest, + ], + [JobStatus.ACCEPTED, JobStatus.EXECUTOR_READY, JobStatus.FAILED], + OrganicJob.Status.FAILED, + True, + True, # Failure receipt is sent + ), + # Full flow but job fails at execution + ( + [ + V0AcceptJobRequest, + V0ExecutorReadyRequest, + V0VolumesReadyRequest, + V0ExecutionDoneRequest, + V0JobFailedRequest, + ], + [ + JobStatus.ACCEPTED, + JobStatus.EXECUTOR_READY, + JobStatus.VOLUMES_READY, + JobStatus.EXECUTION_DONE, + JobStatus.FAILED, + ], + OrganicJob.Status.FAILED, + True, + True, # Failure receipt is sent + ), + # Full successful flow + ( + [ + V0AcceptJobRequest, + V0ExecutorReadyRequest, + V0VolumesReadyRequest, + V0ExecutionDoneRequest, + partial( + V0JobFinishedRequest, + docker_process_stdout="mocked stdout", + docker_process_stderr="mocked stderr", + artifacts={}, + ), + ], + [ + JobStatus.ACCEPTED, + JobStatus.EXECUTOR_READY, + JobStatus.VOLUMES_READY, + JobStatus.EXECUTION_DONE, + JobStatus.COMPLETED, + ], + OrganicJob.Status.COMPLETED, + True, + True, # Success receipt is sent + ), + ], +) +def test_sync_miner_driver( + messages_from_miner, + expected_job_status_updates, + organic_job_status, + expected_job_accepted_receipt, + expected_job_finished_receipt, + settings, +): + miner, _ = Miner.objects.get_or_create(hotkey="miner_client") + job_uuid = str(uuid.uuid4()) + job_request = create_job_request(job_uuid) + job = OrganicJob.objects.create( + job_uuid=job_uuid, + miner=miner, + miner_address="irrelevant", + miner_address_ip_version=4, + miner_port=9999, + executor_class=DEFAULT_EXECUTOR_CLASS, + job_description="User job from facilitator", + block=42, + ) + + # Build messages with job_uuid + messages = [] + for msg_factory in messages_from_miner: + if callable(msg_factory) and not isinstance(msg_factory, type): + # It's a partial or other callable + messages.append(msg_factory(job_uuid=job_uuid)) + else: + # It's a class + messages.append(msg_factory(job_uuid=job_uuid)) + + job_status_updates: list[JobStatusUpdate] = [] + + def track_job_status_updates(x: JobStatusUpdate): + job_status_updates.append(x) + + my_keypair = settings.BITTENSOR_WALLET().hotkey + + miner_client = MockSyncMinerClient( + url="ws://miner:9999/v0.1/validator_interface/test", + miner_hotkey="miner_hotkey", + validator_keypair=my_keypair, + messages_to_return=messages, + ) + + with mock.patch( + "compute_horde_validator.validator.organic_jobs.miner_driver_sync.supertensor" + ) as mock_supertensor: + mock_supertensor.return_value.list_validators.return_value = [] + + driver = SyncOrganicJobDriver( + miner_client, + job, + job_request, + miner_hotkey="miner_hotkey", + my_keypair=my_keypair, + allowed_leeway=5, + reservation_time_limit=10, + executor_startup_time_limit=10, + max_overall_time_limit=60, + status_callback=track_job_status_updates, + ) + driver.run() + + assert len(job_status_updates) == len(expected_job_status_updates), ( + f"Got {[u.status.value for u in job_status_updates]}, expected {[s.value for s in expected_job_status_updates]}" + ) + for job_status, expected_status in zip(job_status_updates, expected_job_status_updates): + assert job_status.status == expected_status + assert job_status.uuid == job_uuid + + job.refresh_from_db() + assert job.status == organic_job_status + + if organic_job_status == OrganicJob.Status.COMPLETED: + assert job.stdout == "mocked stdout" + assert job.stderr == "mocked stderr" + + if expected_job_accepted_receipt: + assert miner_client.query_sent_models(model_class=V0JobAcceptedReceiptRequest) + + if expected_job_finished_receipt: + assert miner_client.query_sent_models(model_class=V0JobFinishedReceiptRequest) + + +@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) +def test_sync_miner_driver_connection_failure(settings): + """Test that connection failures are handled properly.""" + miner, _ = Miner.objects.get_or_create(hotkey="miner_client") + job_uuid = str(uuid.uuid4()) + job_request = create_job_request(job_uuid) + job = OrganicJob.objects.create( + job_uuid=job_uuid, + miner=miner, + miner_address="irrelevant", + miner_address_ip_version=4, + miner_port=9999, + executor_class=DEFAULT_EXECUTOR_CLASS, + job_description="User job from facilitator", + block=42, + ) + + my_keypair = settings.BITTENSOR_WALLET().hotkey + + miner_client = MockSyncMinerClient( + url="ws://miner:9999/v0.1/validator_interface/test", + miner_hotkey="miner_hotkey", + validator_keypair=my_keypair, + messages_to_return=[], + ) + + # Make connect raise an exception + def failing_connect(): + raise ConnectionRefusedError("Connection refused") + + miner_client.connect = failing_connect + + job_status_updates: list[JobStatusUpdate] = [] + + def track_job_status_updates(x: JobStatusUpdate): + job_status_updates.append(x) + + driver = SyncOrganicJobDriver( + miner_client, + job, + job_request, + miner_hotkey="miner_hotkey", + my_keypair=my_keypair, + allowed_leeway=5, + reservation_time_limit=10, + executor_startup_time_limit=10, + max_overall_time_limit=60, + status_callback=track_job_status_updates, + ) + driver.run() + + assert len(job_status_updates) == 1 + assert job_status_updates[0].status == JobStatus.FAILED + + job.refresh_from_db() + assert job.status == OrganicJob.Status.FAILED + assert "connection failed" in job.comment.lower() diff --git a/validator/uv.lock b/validator/uv.lock index 10ab3b3fb..a52b46d1e 100644 --- a/validator/uv.lock +++ b/validator/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = "==3.11.*" resolution-markers = [ "sys_platform == 'linux'", From 8b5ec642285d1092b9efdedf9ea69e69af09410a Mon Sep 17 00:00:00 2001 From: Enam Mijbah Noor Date: Mon, 2 Feb 2026 20:01:00 +0600 Subject: [PATCH 2/3] squash me: fix manifest fetching order --- .../app/src/compute_horde_validator/validator/job_excuses.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/validator/app/src/compute_horde_validator/validator/job_excuses.py b/validator/app/src/compute_horde_validator/validator/job_excuses.py index 3fc8ca8e9..69067b48f 100644 --- a/validator/app/src/compute_horde_validator/validator/job_excuses.py +++ b/validator/app/src/compute_horde_validator/validator/job_excuses.py @@ -100,7 +100,7 @@ async def get_expected_miner_executor_count( executor_class=executor_class, created_at__lte=check_time, ) - .order_by("created_at") + .order_by("-created_at") .only("online_executor_count") .afirst() ) @@ -127,7 +127,7 @@ def get_expected_miner_executor_count_sync( executor_class=executor_class, created_at__lte=check_time, ) - .order_by("created_at") + .order_by("-created_at") .only("online_executor_count") .first() ) From 88df5c766905761d8504e3aac204a1e7bfadf032 Mon Sep 17 00:00:00 2001 From: Enam Mijbah Noor Date: Wed, 4 Feb 2026 16:51:04 +0600 Subject: [PATCH 3/3] squash me: add docstrings --- .../organic_jobs/miner_driver_sync.py | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver_sync.py b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver_sync.py index f91abb1e0..02eeb3877 100644 --- a/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver_sync.py +++ b/validator/app/src/compute_horde_validator/validator/organic_jobs/miner_driver_sync.py @@ -268,6 +268,22 @@ class DriverState(Enum): class SyncOrganicJobDriver: + """ + Drives an organic job through its lifecycle via a state machine. + + State flow: + CONNECT → RESERVE_EXECUTOR → WAIT_JOB_ACCEPTED → SEND_JOB_ACCEPTED_RECEIPT + → WAIT_EXECUTOR_READY → PREPARE_VOLUMES → WAIT_VOLUMES_READY + → [WAIT_STREAMING_JOB_READY] → WAIT_EXECUTION_DONE → COLLECT_RESULTS → COMPLETE + + Any state can transition to FAILED if an error occurs or a timeout is reached. + The WAIT_STREAMING_JOB_READY state is only entered for streaming jobs. + + The driver communicates with the miner via MinerClient and reports status updates + to the facilitator via the status_callback. Receipts are created and persisted + at key points (job started, accepted, finished) for the receipts protocol. + """ + def __init__( self, miner_client: MinerClient, @@ -505,9 +521,16 @@ def _wait_for_message( timeout_status_reason: HordeFailureReason, ) -> _MinerMsgT | DriverState: """ - Waits for a specific message type from the miner within a given timeout period. - Returns the received message if successful. - Returns a terminal DriverState if the timeout is reached or if an error message is received. + Waits for a specific message type from the miner within a timeout. + + Returns either: + - The expected message type on success + - DriverState.FAILED if timeout/error occurs (job is marked failed, events recorded) + + Handles protocol-level errors (GenericError, UnauthorizedError) and job-level + failures (V0DeclineJobRequest, V0JobFailedRequest, V0HordeFailedRequest) by + transitioning to the FAILED state. Ignores manifest requests and messages + for other jobs. """ deadline = time.time() + timeout while True: @@ -607,6 +630,10 @@ def _wait_for_message( ) def run(self) -> None: + """ + Execute the job lifecycle. Handles exceptions by marking the job as failed + and always ensures cleanup (sending failure receipt, undoing allowance, closing connection). + """ try: self._run() except Exception as exc: @@ -685,6 +712,10 @@ def _undo_allowance_if_failed(self) -> None: ) def _run(self) -> None: + """ + Main state machine loop. Each iteration executes the handler for the current + state, which returns the next state. Terminates when COMPLETE or FAILED. + """ while True: match self._state: case DriverState.CONNECT: