Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion validator/app/src/compute_horde_validator/validator/pylon.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import ipaddress

from django.conf import settings
from pylon_client import Hotkey, Neuron, PylonClient

Expand All @@ -24,5 +26,7 @@ def get_serving_hotkeys(neurons: list[Neuron]) -> list[Hotkey]:
list[Hotkey]: List of Hotkey objects.
"""
return [
neuron.hotkey for neuron in neurons if neuron.axon_info and neuron.axon_info.ip != "0.0.0.0"
neuron.hotkey
for neuron in neurons
if neuron.axon_info and neuron.axon_info.ip != ipaddress.IPv4Address("0.0.0.0")
]
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from datetime import timedelta
from typing import assert_never

from compute_horde.blockchain.block_cache import aget_current_block
from compute_horde.executor_class import EXECUTOR_CLASS
from compute_horde.fv_protocol.facilitator_requests import (
OrganicJobRequest,
Expand All @@ -20,11 +19,11 @@
from compute_horde_validator.validator.dynamic_config import aget_config
from compute_horde_validator.validator.models import (
ComputeTimeAllowance,
MetagraphSnapshot,
Miner,
MinerManifest,
MinerPreliminaryReservation,
)
from compute_horde_validator.validator.pylon import pylon_client
from compute_horde_validator.validator.routing.base import RoutingBase
from compute_horde_validator.validator.routing.types import (
AllMinersBusy,
Expand Down Expand Up @@ -109,13 +108,8 @@ async def _pick_miner_for_job_v2(request: V2JobRequest) -> JobRoute:
request.download_time_limit + request.execution_time_limit + request.upload_time_limit
)

block: int | None = None
try:
block = (await MetagraphSnapshot.aget_latest()).block
except Exception as exc:
logger.warning(f"Failed to get latest metagraph snapshot: {exc}")
block = await aget_current_block()
assert block is not None, "Failed to get current block from cache or subtensor."
block = pylon_client().get_latest_block()
assert block is not None, "Failed to get current block from pylon"

if not await aget_config("DYNAMIC_ALLOW_CROSS_CYCLE_ORGANIC_JOBS"):
seconds_remaining_in_cycle = _get_seconds_remaining_in_current_cycle(block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from compute_horde_validator.validator.organic_jobs.miner_driver import execute_organic_job_request

from ...tests.helpers import mocked_pylon_client

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -85,3 +87,13 @@ def miner_keypair():
return bittensor_wallet.Keypair.create_from_mnemonic(
"almost fatigue race slim picnic mass better clog deal solve already champion"
)


@pytest.fixture
def patch_pylon_client():
mock_pylon_client = mocked_pylon_client()
with patch(
"compute_horde_validator.validator.routing.default.pylon_client",
return_value=mock_pylon_client,
):
yield mock_pylon_client
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from compute_horde_validator.validator.models import (
ComputeTimeAllowance,
Cycle,
MetagraphSnapshot,
Miner,
MinerBlacklist,
MinerManifest,
Expand All @@ -33,6 +32,8 @@
)
from compute_horde_validator.validator.utils import TRUSTED_MINER_FAKE_KEY

from ...tests.helpers import mock_pylon_neuron

JOB_REQUEST = V2JobRequest(
uuid=str(uuid.uuid4()),
executor_class=DEFAULT_EXECUTOR_CLASS,
Expand Down Expand Up @@ -64,7 +65,7 @@ def validator(settings):


@pytest.fixture(autouse=True)
def setup_db(miners, validator):
def setup_db(miners, validator, patch_pylon_client): # noqa
now = timezone.now()
cycle = Cycle.objects.create(start=1, stop=2)
batch = SyntheticJobBatch.objects.create(block=1, created_at=now, cycle=cycle)
Expand All @@ -77,16 +78,15 @@ def setup_db(miners, validator):
executor_count=5,
online_executor_count=5,
)
MetagraphSnapshot.objects.create(
id=MetagraphSnapshot.SnapshotType.LATEST,
block=1,
alpha_stake=[2000] + [0] * len(miners),
tao_stake=[2000] + [0] * len(miners),
stake=[2000] + [0] * len(miners),
uids=list(range(len(miners) + 1)),
hotkeys=[validator.hotkey] + [m.hotkey for m in miners],
serving_hotkeys=["miner_0", "miner_1"],
)

neurons = [mock_pylon_neuron(0, validator.hotkey, 2000)] # validator
for i, miner in enumerate(miners):
neurons.append(mock_pylon_neuron(i + 1, miner.hotkey, 0, i < 2))

metagraph_data = {"block": 1, "block_hash": "0x123", "neurons": {n.hotkey: n for n in neurons}}
patch_pylon_client.override("get_metagraph", metagraph_data)
patch_pylon_client.override("get_latest_block", 1)

for miner in miners:
ComputeTimeAllowance.objects.create(
cycle=cycle,
Expand Down Expand Up @@ -353,17 +353,6 @@ async def test_pick_miner_for_job__collateral_tiebreak_on_equal_allowance_percen
assert job_route.miner.hotkey_ss58 == miner2.hotkey


@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
@patch("compute_horde_validator.validator.routing.default.aget_current_block")
async def test_pick_miner_for_job__use_subtensor_for_block_on_cache_miss(
mocked_aget_current_block, miners
):
mocked_aget_current_block.return_value = 1
await MetagraphSnapshot.objects.all().adelete()
await routing().pick_miner_for_job_request(JOB_REQUEST)


@pytest.mark.django_db(transaction=True)
@pytest.mark.override_config(DYNAMIC_ALLOW_CROSS_CYCLE_ORGANIC_JOBS=True)
@patch("compute_horde_validator.validator.routing.default._get_seconds_remaining_in_current_cycle")
Expand Down
30 changes: 19 additions & 11 deletions validator/app/src/compute_horde_validator/validator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from django.utils.timezone import now
from numpy.typing import NDArray
from pydantic import JsonValue, TypeAdapter
from pylon_common.models import Metagraph

from compute_horde_validator.celery import app
from compute_horde_validator.validator import collateral
Expand All @@ -63,7 +64,7 @@
drive_organic_job,
execute_organic_job_request,
)
from compute_horde_validator.validator.pylon import pylon_client
from compute_horde_validator.validator.pylon import get_serving_hotkeys, pylon_client
from compute_horde_validator.validator.routing.types import JobRoute
from compute_horde_validator.validator.s3 import (
download_prompts_from_s3_url,
Expand Down Expand Up @@ -1271,17 +1272,23 @@ async def get_manifests_from_miners(

@app.task
def set_compute_time_allowances() -> None:
metagraph = MetagraphSnapshot.get_cycle_start()
current_cycle = Cycle.from_block(metagraph.block, netuid=settings.BITTENSOR_NETUID)
# Get current block to determine the cycle
current_block = pylon_client().get_latest_block()
current_cycle = Cycle.from_block(current_block, netuid=settings.BITTENSOR_NETUID)
if current_cycle.set_compute_time_allowance:
logger.debug(f"allowances already calculated for cycle {current_cycle}")
return

# Get metagraph at cycle start block for consistent allowance calculation
cycle_start_metagraph = pylon_client().get_metagraph(block=current_cycle.start)

set_success = False
try:
current_cycle.set_compute_time_allowance = True
current_cycle.save()
set_success = async_to_sync(_set_compute_time_allowances)(metagraph, current_cycle)
set_success = async_to_sync(_set_compute_time_allowances)(
cycle_start_metagraph, current_cycle
)

except Exception:
msg = "Failed to set compute time allowances: {e}"
Expand All @@ -1298,7 +1305,7 @@ def set_compute_time_allowances() -> None:
current_cycle.save()


async def _set_compute_time_allowances(metagraph: MetagraphSnapshot, cycle: Cycle) -> bool:
async def _set_compute_time_allowances(metagraph: Metagraph, cycle: Cycle) -> bool:
"""
Calculate and save allowances for all validator-miner pairs at the beginning of a cycle.
"""
Expand All @@ -1308,14 +1315,15 @@ async def _set_compute_time_allowances(metagraph: MetagraphSnapshot, cycle: Cycl
"cycle_stop": cycle.stop,
}

miners_hotkeys = metagraph.get_serving_hotkeys()
metagraph = pylon_client().get_metagraph()
miners_hotkeys = get_serving_hotkeys(metagraph.get_neurons())
if len(miners_hotkeys) == 0:
msg = "No miners in the last metagraph snapshot - will NOT set compute time allowances"
await save_compute_time_allowance_event(SystemEvent.EventSubType.GIVING_UP, msg, data)
logger.warning(msg)
return False

total_stake = metagraph.get_total_validator_stake()
total_stake = sum(neuron.stake for neuron in metagraph.get_neurons())
if total_stake is None or total_stake <= 0.0:
msg = "Total stake for last_metagraph snapshot is 0 - skipping compute time allowances"
await save_compute_time_allowance_event(SystemEvent.EventSubType.GIVING_UP, msg, data)
Expand All @@ -1325,10 +1333,10 @@ async def _set_compute_time_allowances(metagraph: MetagraphSnapshot, cycle: Cycl
# compute stake proportion for each validator
hotkey_to_stake_proportion = {}
validators_hotkeys = []
for hotkey, stake in zip(metagraph.hotkeys, metagraph.stake):
if stake > MIN_VALIDATOR_STAKE:
validators_hotkeys.append(hotkey)
hotkey_to_stake_proportion[hotkey] = stake / total_stake
for neuron in metagraph.get_neurons():
if neuron.stake >= MIN_VALIDATOR_STAKE:
validators_hotkeys.append(neuron.hotkey)
hotkey_to_stake_proportion[neuron.hotkey] = neuron.stake / total_stake
logger.debug(f"Validator stake proportion: {hotkey_to_stake_proportion}")
data["validator_stake_proportion"] = hotkey_to_stake_proportion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pytest_mock import MockerFixture

from ..organic_jobs.miner_driver import execute_organic_job_request
from .helpers import MockNeuron, MockSyntheticMinerClient
from .helpers import MockNeuron, MockSyntheticMinerClient, mocked_pylon_client

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -151,6 +151,15 @@ def run_uuid():
return str(uuid.uuid4())


@pytest.fixture()
def patch_pylon_client():
mock_pylon_client = mocked_pylon_client()
with patch(
"compute_horde_validator.validator.tasks.pylon_client", return_value=mock_pylon_client
):
yield mock_pylon_client


# NOTE: Use this fixture when you need to find dangling asyncio tasks. It is currently commented
# because redis channels layers keeps dangling tasks, that makes the tests fail -_-
# @pytest_asyncio.fixture(autouse=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import glob
import ipaddress
import logging
import numbers
import os
Expand Down Expand Up @@ -28,6 +29,7 @@
from constance.base import Config
from django.conf import settings
from pydantic import TypeAdapter
from pylon_client import AxonInfo, Neuron, PylonClient

from compute_horde_validator.validator.models import SystemEvent
from compute_horde_validator.validator.organic_jobs.miner_client import MinerClient
Expand All @@ -39,6 +41,36 @@
logger = logging.getLogger(__name__)


def mock_pylon_neuron(uid: int, hotkey: str, stake: float, serving: bool = True) -> Neuron:
return Neuron(
uid=uid,
coldkey=f"coldkey_{uid}",
hotkey=hotkey,
active=True,
axon_info=AxonInfo(
ip=ipaddress.IPv4Address("127.0.0.1" if serving else "0.0.0.0"),
port=8080,
protocol=4,
),
stake=stake,
rank=0.1,
emission=0.01,
incentive=0.1,
consensus=0.9,
trust=0.9,
validator_trust=0.9,
dividends=0.01,
last_update=12340,
validator_permit=("validator" in hotkey),
pruning_score=0,
)


def mocked_pylon_client():
mock_data_path = os.path.join(os.path.dirname(__file__), "pylon_mock_data.json")
return PylonClient(mock_data_path=mock_data_path)


def throw_error(*args):
raise Exception("Error thrown for testing")

Expand Down
Loading