Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ def get_current_block(self) -> int:
"""
pass

@abstractmethod
def get_latest_finalized_block(self) -> int:
"""
Return the latest finalized block number (current block minus finalization offset).
"""
pass

@abstractmethod
def get_manifests(self) -> dict[ss58_address, dict[ExecutorClass, int]]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def validate_foreign_receipt(self):
def get_current_block(self) -> int:
return supertensor().get_current_block()

def get_latest_finalized_block(self) -> int:
return supertensor().get_latest_finalized_block()

def get_manifests(self) -> dict[ss58_address, dict[ExecutorClass, int]]:
return manifests.get_current_manifests()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
BLOCK_EXPIRY = 722
# A job started at block N can be paid for block N-BLOCK_EXPIRY and newer ones

BLOCK_FINALIZATION_OFFSET = 5
# The offset from the current block that we consider to be "finalized"

MANIFEST_FETCHING_TIMEOUT = 30.0

MAX_JOB_RUN_TIME = 60 * 60.0
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def scan_blocks_and_calculate_allowance(
if not AllowanceMinerManifest.objects.exists():
logger.warning("No miner manifests found, skipping allowance calculation")
return
current_block = supertensor().get_current_block()
with transaction.atomic(using=settings.DEFAULT_DB_ALIAS):
try:
with Lock(LockType.ALLOWANCE_FETCHING, LOCK_WAIT_TIMEOUT, settings.DEFAULT_DB_ALIAS):
Expand All @@ -58,7 +57,6 @@ def scan_blocks_and_calculate_allowance(
cm = contextlib.nullcontext(backfilling_supertensor)
with cm as backfilling_supertensor:
blocks.backfill_blocks_if_necessary(
current_block,
MAX_RUN_TIME,
report_allowance_checkpoint.delay,
backfilling_supertensor,
Expand All @@ -67,11 +65,7 @@ def scan_blocks_and_calculate_allowance(
if time_left < 0:
raise blocks.TimesUpError

blocks.livefill_blocks(
current_block,
time_left,
report_allowance_checkpoint.delay,
)
blocks.livefill_blocks(time_left, report_allowance_checkpoint.delay)
except Locked:
logger.debug("Another thread already fetching blocks")
except blocks.TimesUpError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
SYNC_MANIFESTS_INTERVAL = 25
MANIFEST_CHANGE_INTERVAL = 300
MANIFEST_FETCHING_TIMEOUT = 1.0
FINALIZATION_OFFSET = 5

MINER_HOTKEYS = {
**{
Expand Down Expand Up @@ -291,6 +292,9 @@ def inc_block_number(self):
def get_current_block(self):
return self.block_number

def get_latest_finalized_block(self):
return self.get_current_block() - FINALIZATION_OFFSET

def get_shielded_neurons(self):
return list_neurons(block_number_, with_shield=False)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from freezegun import freeze_time

from compute_horde_validator.validator.allowance import tasks as allowance_tasks
from compute_horde_validator.validator.allowance.tests.mockchain import set_block_number
from compute_horde_validator.validator.allowance.tests.mockchain import (
FINALIZATION_OFFSET,
set_block_number,
)
from compute_horde_validator.validator.allowance.utils import blocks, manifests
from compute_horde_validator.validator.allowance.utils.supertensor import supertensor

Expand All @@ -25,7 +28,7 @@ def process_block_allowance_with_reporting_side_effect(*a, **kw):
with set_block_number(1000, oldest_reachable_block=1005):
manifests.sync_manifests()

with set_block_number(1013, oldest_reachable_block=1005):
with set_block_number(1013 + FINALIZATION_OFFSET, oldest_reachable_block=1005):
backfilling_supertensor = supertensor()

with (
Expand All @@ -36,7 +39,7 @@ def process_block_allowance_with_reporting_side_effect(*a, **kw):
"sleep",
side_effect=lambda *a, **kw: supertensor().inc_block_number(), # type: ignore
),
set_block_number(1013, oldest_reachable_block=1005),
set_block_number(1013 + FINALIZATION_OFFSET, oldest_reachable_block=1005),
mock.patch.object(
blocks,
"process_block_allowance_with_reporting",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def time_left(self):
return self.max_run_time - (time.time() - self.start_time)


def wait_for_block(target_block: int, timeout_seconds: float):
def wait_for_block(target_block: int, timeout_seconds: float, wait_for_finalization: bool = False):
timeout_seconds = max(timeout_seconds, MIN_BLOCK_WAIT_TIME)
start = time.time()

Expand All @@ -89,7 +89,10 @@ def wait_for_block(target_block: int, timeout_seconds: float):
)
raise TimesUpError(f"Timeout waiting for block {target_block}")
try:
current_block = supertensor().get_current_block()
if wait_for_finalization:
current_block = supertensor().get_latest_finalized_block()
else:
current_block = supertensor().get_current_block()
except CannotGetCurrentBlock:
time.sleep(0.1)
continue
Expand Down Expand Up @@ -339,11 +342,11 @@ def report_checkpoint(block_number_lt: int, block_number_gte: int):


def backfill_blocks_if_necessary(
current_block: int,
max_run_time: float | int,
report_callback: Callable[[int, int], Any] | None = None,
backfilling_supertensor: SuperTensor | None = None,
):
current_block = supertensor().get_latest_finalized_block()
if backfilling_supertensor is None:
backfilling_supertensor = supertensor()
timer = Timer(max_run_time)
Expand All @@ -366,14 +369,14 @@ def backfill_blocks_if_necessary(


def livefill_blocks(
current_block: int,
max_run_time: float | int,
report_callback: Callable[[int, int], Any] | None = None,
):
current_block = supertensor().get_latest_finalized_block()
timer = Timer(max_run_time)
try:
while True:
wait_for_block(current_block + 1, timer.time_left())
wait_for_block(current_block + 1, timer.time_left(), wait_for_finalization=True)
current_block += 1

process_block_allowance_with_reporting(current_block, supertensor(), live=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

from compute_horde_validator.validator.allowance.types import ValidatorModel

from .. import settings

DEFAULT_TIMEOUT = 30.0

# Context variables for bittensor and subnet
Expand Down Expand Up @@ -114,6 +116,9 @@ def get_shielded_neurons(self) -> list[turbobt.Neuron]: ...
@abc.abstractmethod
def get_current_block(self) -> int: ...

@abc.abstractmethod
def get_latest_finalized_block(self) -> int: ...

@abc.abstractmethod
def wallet(self) -> bittensor_wallet.Wallet: ...

Expand Down Expand Up @@ -262,12 +267,16 @@ def get_current_block(self) -> int:
current_block = get_current_block()
except websockets.exceptions.ConcurrencyError as ex:
raise CannotGetCurrentBlock("Cannot get current block from blockchain") from ex
return current_block - 5
return current_block

def close(self):
self.loop.run_until_complete(self.bittensor.close())
if self.archive_bittensor is not None:
self.loop.run_until_complete(self.archive_bittensor.close())
return get_current_block()

def get_latest_finalized_block(self) -> int:
return self.get_current_block() - settings.BLOCK_FINALIZATION_OFFSET


N_THREADS = 10
Expand Down
Loading