diff --git a/validator/app/src/compute_horde_validator/validator/allowance/base.py b/validator/app/src/compute_horde_validator/validator/allowance/base.py index ddb19863d..8b99d400d 100644 --- a/validator/app/src/compute_horde_validator/validator/allowance/base.py +++ b/validator/app/src/compute_horde_validator/validator/allowance/base.py @@ -117,6 +117,13 @@ def get_current_block(self) -> int: """ pass + @abstractmethod + def get_latest_finalized_block(self) -> int: + """ + Return the latest finalized block number (current block minus finalization offset). + """ + pass + @abstractmethod def get_manifests(self) -> dict[ss58_address, dict[ExecutorClass, int]]: """ diff --git a/validator/app/src/compute_horde_validator/validator/allowance/default.py b/validator/app/src/compute_horde_validator/validator/allowance/default.py index b4c7ed0f2..17d3e8d33 100644 --- a/validator/app/src/compute_horde_validator/validator/allowance/default.py +++ b/validator/app/src/compute_horde_validator/validator/allowance/default.py @@ -39,6 +39,9 @@ def validate_foreign_receipt(self): def get_current_block(self) -> int: return supertensor().get_current_block() + def get_latest_finalized_block(self) -> int: + return supertensor().get_latest_finalized_block() + def get_manifests(self) -> dict[ss58_address, dict[ExecutorClass, int]]: return manifests.get_current_manifests() diff --git a/validator/app/src/compute_horde_validator/validator/allowance/settings.py b/validator/app/src/compute_horde_validator/validator/allowance/settings.py index eb7282252..7a6c04dd3 100644 --- a/validator/app/src/compute_horde_validator/validator/allowance/settings.py +++ b/validator/app/src/compute_horde_validator/validator/allowance/settings.py @@ -9,6 +9,9 @@ BLOCK_EXPIRY = 722 # A job started at block N can be paid for block N-BLOCK_EXPIRY and newer ones +BLOCK_FINALIZATION_OFFSET = 5 +# The offset from the current block that we consider to be "finalized" + MANIFEST_FETCHING_TIMEOUT = 30.0 MAX_JOB_RUN_TIME = 60 * 60.0 diff --git a/validator/app/src/compute_horde_validator/validator/allowance/tasks.py b/validator/app/src/compute_horde_validator/validator/allowance/tasks.py index e7e6e9558..01b8f2c9a 100644 --- a/validator/app/src/compute_horde_validator/validator/allowance/tasks.py +++ b/validator/app/src/compute_horde_validator/validator/allowance/tasks.py @@ -45,7 +45,6 @@ def scan_blocks_and_calculate_allowance( if not AllowanceMinerManifest.objects.exists(): logger.warning("No miner manifests found, skipping allowance calculation") return - current_block = supertensor().get_current_block() with transaction.atomic(using=settings.DEFAULT_DB_ALIAS): try: with Lock(LockType.ALLOWANCE_FETCHING, LOCK_WAIT_TIMEOUT, settings.DEFAULT_DB_ALIAS): @@ -58,7 +57,6 @@ def scan_blocks_and_calculate_allowance( cm = contextlib.nullcontext(backfilling_supertensor) with cm as backfilling_supertensor: blocks.backfill_blocks_if_necessary( - current_block, MAX_RUN_TIME, report_allowance_checkpoint.delay, backfilling_supertensor, @@ -67,11 +65,7 @@ def scan_blocks_and_calculate_allowance( if time_left < 0: raise blocks.TimesUpError - blocks.livefill_blocks( - current_block, - time_left, - report_allowance_checkpoint.delay, - ) + blocks.livefill_blocks(time_left, report_allowance_checkpoint.delay) except Locked: logger.debug("Another thread already fetching blocks") except blocks.TimesUpError: diff --git a/validator/app/src/compute_horde_validator/validator/allowance/tests/mockchain.py b/validator/app/src/compute_horde_validator/validator/allowance/tests/mockchain.py index cfca1782c..69b4ed782 100644 --- a/validator/app/src/compute_horde_validator/validator/allowance/tests/mockchain.py +++ b/validator/app/src/compute_horde_validator/validator/allowance/tests/mockchain.py @@ -23,6 +23,7 @@ SYNC_MANIFESTS_INTERVAL = 25 MANIFEST_CHANGE_INTERVAL = 300 MANIFEST_FETCHING_TIMEOUT = 1.0 +FINALIZATION_OFFSET = 5 MINER_HOTKEYS = { **{ @@ -291,6 +292,9 @@ def inc_block_number(self): def get_current_block(self): return self.block_number + def get_latest_finalized_block(self): + return self.get_current_block() - FINALIZATION_OFFSET + def get_shielded_neurons(self): return list_neurons(block_number_, with_shield=False) diff --git a/validator/app/src/compute_horde_validator/validator/allowance/tests/test_tasks.py b/validator/app/src/compute_horde_validator/validator/allowance/tests/test_tasks.py index a17c826e6..4d22f23e5 100644 --- a/validator/app/src/compute_horde_validator/validator/allowance/tests/test_tasks.py +++ b/validator/app/src/compute_horde_validator/validator/allowance/tests/test_tasks.py @@ -8,7 +8,10 @@ from freezegun import freeze_time from compute_horde_validator.validator.allowance import tasks as allowance_tasks -from compute_horde_validator.validator.allowance.tests.mockchain import set_block_number +from compute_horde_validator.validator.allowance.tests.mockchain import ( + FINALIZATION_OFFSET, + set_block_number, +) from compute_horde_validator.validator.allowance.utils import blocks, manifests from compute_horde_validator.validator.allowance.utils.supertensor import supertensor @@ -25,7 +28,7 @@ def process_block_allowance_with_reporting_side_effect(*a, **kw): with set_block_number(1000, oldest_reachable_block=1005): manifests.sync_manifests() - with set_block_number(1013, oldest_reachable_block=1005): + with set_block_number(1013 + FINALIZATION_OFFSET, oldest_reachable_block=1005): backfilling_supertensor = supertensor() with ( @@ -36,7 +39,7 @@ def process_block_allowance_with_reporting_side_effect(*a, **kw): "sleep", side_effect=lambda *a, **kw: supertensor().inc_block_number(), # type: ignore ), - set_block_number(1013, oldest_reachable_block=1005), + set_block_number(1013 + FINALIZATION_OFFSET, oldest_reachable_block=1005), mock.patch.object( blocks, "process_block_allowance_with_reporting", diff --git a/validator/app/src/compute_horde_validator/validator/allowance/utils/blocks.py b/validator/app/src/compute_horde_validator/validator/allowance/utils/blocks.py index 9973f07d1..d69309357 100644 --- a/validator/app/src/compute_horde_validator/validator/allowance/utils/blocks.py +++ b/validator/app/src/compute_horde_validator/validator/allowance/utils/blocks.py @@ -78,7 +78,7 @@ def time_left(self): return self.max_run_time - (time.time() - self.start_time) -def wait_for_block(target_block: int, timeout_seconds: float): +def wait_for_block(target_block: int, timeout_seconds: float, wait_for_finalization: bool = False): timeout_seconds = max(timeout_seconds, MIN_BLOCK_WAIT_TIME) start = time.time() @@ -89,7 +89,10 @@ def wait_for_block(target_block: int, timeout_seconds: float): ) raise TimesUpError(f"Timeout waiting for block {target_block}") try: - current_block = supertensor().get_current_block() + if wait_for_finalization: + current_block = supertensor().get_latest_finalized_block() + else: + current_block = supertensor().get_current_block() except CannotGetCurrentBlock: time.sleep(0.1) continue @@ -339,11 +342,11 @@ def report_checkpoint(block_number_lt: int, block_number_gte: int): def backfill_blocks_if_necessary( - current_block: int, max_run_time: float | int, report_callback: Callable[[int, int], Any] | None = None, backfilling_supertensor: SuperTensor | None = None, ): + current_block = supertensor().get_latest_finalized_block() if backfilling_supertensor is None: backfilling_supertensor = supertensor() timer = Timer(max_run_time) @@ -366,14 +369,14 @@ def backfill_blocks_if_necessary( def livefill_blocks( - current_block: int, max_run_time: float | int, report_callback: Callable[[int, int], Any] | None = None, ): + current_block = supertensor().get_latest_finalized_block() timer = Timer(max_run_time) try: while True: - wait_for_block(current_block + 1, timer.time_left()) + wait_for_block(current_block + 1, timer.time_left(), wait_for_finalization=True) current_block += 1 process_block_allowance_with_reporting(current_block, supertensor(), live=True) diff --git a/validator/app/src/compute_horde_validator/validator/allowance/utils/supertensor.py b/validator/app/src/compute_horde_validator/validator/allowance/utils/supertensor.py index 9ad7f7c98..33af4a1d6 100644 --- a/validator/app/src/compute_horde_validator/validator/allowance/utils/supertensor.py +++ b/validator/app/src/compute_horde_validator/validator/allowance/utils/supertensor.py @@ -22,6 +22,8 @@ from compute_horde_validator.validator.allowance.types import ValidatorModel +from .. import settings + DEFAULT_TIMEOUT = 30.0 # Context variables for bittensor and subnet @@ -114,6 +116,9 @@ def get_shielded_neurons(self) -> list[turbobt.Neuron]: ... @abc.abstractmethod def get_current_block(self) -> int: ... + @abc.abstractmethod + def get_latest_finalized_block(self) -> int: ... + @abc.abstractmethod def wallet(self) -> bittensor_wallet.Wallet: ... @@ -262,12 +267,16 @@ def get_current_block(self) -> int: current_block = get_current_block() except websockets.exceptions.ConcurrencyError as ex: raise CannotGetCurrentBlock("Cannot get current block from blockchain") from ex - return current_block - 5 + return current_block def close(self): self.loop.run_until_complete(self.bittensor.close()) if self.archive_bittensor is not None: self.loop.run_until_complete(self.archive_bittensor.close()) + return get_current_block() + + def get_latest_finalized_block(self) -> int: + return self.get_current_block() - settings.BLOCK_FINALIZATION_OFFSET N_THREADS = 10