diff --git a/compute_horde/compute_horde/manifest_utils.py b/compute_horde/compute_horde/manifest_utils.py new file mode 100644 index 000000000..2ff708e81 --- /dev/null +++ b/compute_horde/compute_horde/manifest_utils.py @@ -0,0 +1,83 @@ +""" +Utilities for working with miner manifests and knowledge commitments. + +These functions are shared between miner and validator for consistent +manifest format and parsing. +""" + +import logging + +from compute_horde_core.executor_class import ExecutorClass + +logger = logging.getLogger(__name__) + +MAX_COMMITMENT_LENGTH = 128 + + +def format_manifest_commitment(manifest: dict[ExecutorClass, int]) -> str: + """ + Converts manifest dict to commitment string format. + + Example: "spin_up-4min.gpu-24gb=2;always_on.gpu-24gb=3" + + - Sorted by executor class name for consistency + - No trailing semicolon + - Max 128 chars + + Raises: + ValueError: If the resulting string exceeds MAX_COMMITMENT_LENGTH + """ + if not manifest: + return "" + + # Sort by executor class value (string) for consistency + sorted_items = sorted(manifest.items(), key=lambda x: x[0].value) + + # Format as key=value pairs joined by semicolon + parts = [f"{executor_class.value}={count}" for executor_class, count in sorted_items] + commitment_string = ";".join(parts) + + if len(commitment_string) > MAX_COMMITMENT_LENGTH: + raise ValueError( + f"Commitment string length {len(commitment_string)} exceeds maximum {MAX_COMMITMENT_LENGTH}" + ) + + return commitment_string + + +def parse_commitment_string(commitment: str) -> dict[ExecutorClass, int]: + """ + Parses commitment string back to manifest dict. + + Returns empty dict if invalid/empty. + """ + if not commitment or not commitment.strip(): + return {} + + manifest = {} + try: + # Split by semicolon + pairs = commitment.split(";") + for pair in pairs: + if "=" not in pair: + logger.warning(f"Invalid commitment pair (missing '='): {pair}") + continue + + key, value = pair.split("=", 1) + key = key.strip() + value = value.strip() + + # Try to parse as ExecutorClass + try: + executor_class = ExecutorClass(key) + count = int(value) + manifest[executor_class] = count + except (ValueError, KeyError) as e: + logger.warning(f"Invalid commitment pair {pair}: {e}") + continue + + except Exception as e: + logger.error(f"Error parsing commitment string '{commitment}': {e}") + return {} + + return manifest diff --git a/miner/app/src/compute_horde_miner/miner/manifest_commitment.py b/miner/app/src/compute_horde_miner/miner/manifest_commitment.py new file mode 100644 index 000000000..2268bb44c --- /dev/null +++ b/miner/app/src/compute_horde_miner/miner/manifest_commitment.py @@ -0,0 +1,77 @@ +import logging + +import bittensor +from compute_horde.manifest_utils import ( + format_manifest_commitment, + parse_commitment_string, +) +from compute_horde_core.executor_class import ExecutorClass + +logger = logging.getLogger(__name__) + +# Re-export for backward compatibility +__all__ = [ + "format_manifest_commitment", + "parse_commitment_string", + "has_manifest_changed", + "commit_manifest_to_subtensor", +] + + +def has_manifest_changed( + current_manifest: dict[ExecutorClass, int], chain_commitment: str | None +) -> bool: + """ + Compare current manifest with on-chain commitment. + + Returns True if different or if no commitment exists. + """ + if not chain_commitment: + return True + + chain_manifest = parse_commitment_string(chain_commitment) + return current_manifest != chain_manifest + + +def commit_manifest_to_subtensor( + manifest: dict[ExecutorClass, int], + wallet: bittensor.wallet, + subtensor: bittensor.subtensor, + netuid: int, +) -> bool: + """ + Commits manifest to knowledge commitments. + + Returns True if successful, False otherwise. + + Steps: + 1. Format manifest to commitment string + 2. Validate length <= 128 chars + 3. Call subtensor.commit(wallet, netuid, commitment_string) + 4. Handle rate limiting (100 blocks) + + Note: Empty manifests are allowed to support "pausing" a miner. + """ + try: + # Format manifest to commitment string (empty manifests result in empty string) + commitment_string = format_manifest_commitment(manifest) + + logger.info(f"Committing manifest to chain: {commitment_string or '(empty)'}") + + # Commit to subtensor (empty string is valid for pausing) + # Note: subtensor.commit() is rate limited to 100 blocks + success = subtensor.commit(wallet, netuid, commitment_string) + + if success: + logger.info("Successfully committed manifest to chain") + return True + else: + logger.warning("Failed to commit manifest to chain") + return False + + except ValueError as e: + logger.error(f"Invalid manifest format: {e}") + return False + except Exception as e: + logger.error(f"Error committing manifest to subtensor: {e}", exc_info=True) + return False diff --git a/miner/app/src/compute_horde_miner/miner/tasks.py b/miner/app/src/compute_horde_miner/miner/tasks.py index 4a44ee112..4364e645e 100644 --- a/miner/app/src/compute_horde_miner/miner/tasks.py +++ b/miner/app/src/compute_horde_miner/miner/tasks.py @@ -1,5 +1,6 @@ import datetime +import bittensor from asgiref.sync import async_to_sync from celery.utils.log import get_task_logger from compute_horde.dynamic_config import fetch_dynamic_configs_from_contract, sync_dynamic_config @@ -11,6 +12,11 @@ from compute_horde_miner.celery import app from compute_horde_miner.miner import eviction, quasi_axon +from compute_horde_miner.miner.executor_manager import current +from compute_horde_miner.miner.manifest_commitment import ( + commit_manifest_to_subtensor, + has_manifest_changed, +) from compute_horde_miner.miner.models import Validator from compute_horde_miner.miner.receipts import current_store @@ -96,3 +102,58 @@ def archive_receipt_pages(): return store.archive_old_pages() + + +@app.task +def commit_manifest_to_chain(): + """ + Periodically checks and commits manifest if changed. + + Steps: + 1. Get current manifest from executor_manager + 2. Get on-chain commitment via subtensor.get_commitment() + 3. Compare: has_manifest_changed() + 4. If changed: + a. Commit to subtensor + b. Log success/failure + + Note: Empty manifests are allowed to support "pausing" a miner. + + Error handling: + - Rate limit: Skip silently, will retry next cycle + - Connection errors: Log warning, retry next cycle + - Format errors: Log critical error + """ + try: + # Get current manifest (empty manifest is valid for pausing) + manifest = async_to_sync(current.executor_manager.get_manifest)() + + # Get on-chain commitment + wallet = settings.BITTENSOR_WALLET() + subtensor = bittensor.subtensor(network=settings.BITTENSOR_NETWORK) + + try: + # Get commitment for this miner's hotkey + chain_commitment = subtensor.get_commitment( + netuid=settings.BITTENSOR_NETUID, + hotkey=wallet.hotkey.ss58_address, + ) + except Exception as e: + logger.warning(f"Failed to get on-chain commitment: {e}") + chain_commitment = None + + # Check if manifest has changed + if has_manifest_changed(manifest, chain_commitment): + logger.info("Manifest has changed, committing to chain") + success = commit_manifest_to_subtensor( + manifest, wallet, subtensor, settings.BITTENSOR_NETUID + ) + if success: + logger.info("Successfully committed manifest to chain") + else: + logger.warning("Failed to commit manifest to chain") + else: + logger.debug("Manifest unchanged, skipping commitment") + + except Exception as e: + logger.error(f"Error in commit_manifest_to_chain: {e}", exc_info=True) diff --git a/miner/app/src/compute_horde_miner/miner/tests/test_manifest_commitment.py b/miner/app/src/compute_horde_miner/miner/tests/test_manifest_commitment.py new file mode 100644 index 000000000..6a2eb3996 --- /dev/null +++ b/miner/app/src/compute_horde_miner/miner/tests/test_manifest_commitment.py @@ -0,0 +1,422 @@ +import pytest +from unittest.mock import Mock, patch + +from compute_horde.manifest_utils import MAX_COMMITMENT_LENGTH +from compute_horde_core.executor_class import ExecutorClass + +from compute_horde_miner.miner.manifest_commitment import ( + commit_manifest_to_subtensor, + format_manifest_commitment, + has_manifest_changed, + parse_commitment_string, +) + + +class TestFormatManifestCommitment: + def test_format_manifest_commitment(self): + manifest = { + ExecutorClass.spin_up_4min__gpu_24gb: 2, + ExecutorClass.always_on__gpu_24gb: 3, + } + result = format_manifest_commitment(manifest) + assert result == "always_on.gpu-24gb=3;spin_up-4min.gpu-24gb=2" + + def test_format_manifest_commitment_sorted(self): + # Test that order is consistent regardless of input order + manifest1 = { + ExecutorClass.always_on__gpu_24gb: 3, + ExecutorClass.spin_up_4min__gpu_24gb: 2, + } + manifest2 = { + ExecutorClass.spin_up_4min__gpu_24gb: 2, + ExecutorClass.always_on__gpu_24gb: 3, + } + assert format_manifest_commitment(manifest1) == format_manifest_commitment(manifest2) + + def test_format_manifest_commitment_no_trailing_semicolon(self): + manifest = {ExecutorClass.always_on__gpu_24gb: 1} + result = format_manifest_commitment(manifest) + assert not result.endswith(";") + + def test_format_manifest_commitment_empty(self): + manifest = {} + result = format_manifest_commitment(manifest) + assert result == "" + + def test_format_manifest_commitment_single_class(self): + manifest = {ExecutorClass.always_on__llm__a6000: 5} + result = format_manifest_commitment(manifest) + assert result == "always_on.llm.a6000=5" + + def test_format_manifest_commitment_all_classes(self): + manifest = { + ExecutorClass.spin_up_4min__gpu_24gb: 1, + ExecutorClass.always_on__gpu_24gb: 2, + ExecutorClass.always_on__llm__a6000: 3, + } + result = format_manifest_commitment(manifest) + expected = "always_on.gpu-24gb=2;always_on.llm.a6000=3;spin_up-4min.gpu-24gb=1" + assert result == expected + + def test_format_manifest_commitment_too_long(self): + # Create a manifest that would exceed max length (128 chars) + # Use all three executor classes with very large numbers + long_manifest = { + ExecutorClass.always_on__gpu_24gb: 999999999999999999999999999999999999999999999999, + ExecutorClass.always_on__llm__a6000: 999999999999999999999999999999999999999999999999, + ExecutorClass.spin_up_4min__gpu_24gb: 999999999999999999999999999999999999999999999999, + } + with pytest.raises(ValueError) as exc_info: + format_manifest_commitment(long_manifest) + assert "exceeds maximum" in str(exc_info.value) + + +class TestParseCommitmentString: + def test_parse_commitment_string(self): + commitment = "always_on.gpu-24gb=3;spin_up-4min.gpu-24gb=2" + result = parse_commitment_string(commitment) + expected = { + ExecutorClass.always_on__gpu_24gb: 3, + ExecutorClass.spin_up_4min__gpu_24gb: 2, + } + assert result == expected + + def test_parse_commitment_string_empty(self): + result = parse_commitment_string("") + assert result == {} + + def test_parse_commitment_string_whitespace(self): + result = parse_commitment_string(" ") + assert result == {} + + def test_parse_commitment_string_invalid_format(self): + # Missing equals sign + result = parse_commitment_string("always_on.gpu-24gb:3") + assert result == {} + + def test_parse_commitment_string_invalid_executor_class(self): + # Invalid executor class name + result = parse_commitment_string("invalid-class=3") + assert result == {} + + def test_parse_commitment_string_invalid_count(self): + # Non-numeric count + result = parse_commitment_string("always_on.gpu-24gb=abc") + assert result == {} + + def test_parse_commitment_string_with_spaces(self): + commitment = " always_on.gpu-24gb = 3 ; spin_up-4min.gpu-24gb = 2 " + result = parse_commitment_string(commitment) + expected = { + ExecutorClass.always_on__gpu_24gb: 3, + ExecutorClass.spin_up_4min__gpu_24gb: 2, + } + assert result == expected + + def test_parse_commitment_string_partial_invalid(self): + # One valid, one invalid + commitment = "always_on.gpu-24gb=3;invalid-class=2" + result = parse_commitment_string(commitment) + expected = {ExecutorClass.always_on__gpu_24gb: 3} + assert result == expected + + def test_parse_commitment_string_single(self): + commitment = "always_on.llm.a6000=5" + result = parse_commitment_string(commitment) + expected = {ExecutorClass.always_on__llm__a6000: 5} + assert result == expected + + +class TestHasManifestChanged: + def test_has_manifest_changed_when_different(self): + current_manifest = {ExecutorClass.always_on__gpu_24gb: 3} + chain_commitment = "always_on.gpu-24gb=2" + assert has_manifest_changed(current_manifest, chain_commitment) is True + + def test_has_manifest_changed_when_same(self): + current_manifest = {ExecutorClass.always_on__gpu_24gb: 3} + chain_commitment = "always_on.gpu-24gb=3" + assert has_manifest_changed(current_manifest, chain_commitment) is False + + def test_has_manifest_changed_when_no_chain_commitment(self): + current_manifest = {ExecutorClass.always_on__gpu_24gb: 3} + assert has_manifest_changed(current_manifest, None) is True + + def test_has_manifest_changed_when_empty_chain_commitment(self): + current_manifest = {ExecutorClass.always_on__gpu_24gb: 3} + assert has_manifest_changed(current_manifest, "") is True + + def test_has_manifest_changed_multiple_classes_same(self): + current_manifest = { + ExecutorClass.always_on__gpu_24gb: 3, + ExecutorClass.spin_up_4min__gpu_24gb: 2, + } + chain_commitment = "always_on.gpu-24gb=3;spin_up-4min.gpu-24gb=2" + assert has_manifest_changed(current_manifest, chain_commitment) is False + + def test_has_manifest_changed_multiple_classes_different_order(self): + # Order shouldn't matter + current_manifest = { + ExecutorClass.always_on__gpu_24gb: 3, + ExecutorClass.spin_up_4min__gpu_24gb: 2, + } + chain_commitment = "spin_up-4min.gpu-24gb=2;always_on.gpu-24gb=3" + assert has_manifest_changed(current_manifest, chain_commitment) is False + + def test_has_manifest_changed_different_classes(self): + current_manifest = {ExecutorClass.always_on__gpu_24gb: 3} + chain_commitment = "always_on.llm.a6000=3" + assert has_manifest_changed(current_manifest, chain_commitment) is True + + def test_has_manifest_changed_empty_manifest(self): + current_manifest = {} + chain_commitment = "always_on.gpu-24gb=3" + assert has_manifest_changed(current_manifest, chain_commitment) is True + + +class TestCommitManifestToSubtensor: + def test_commit_manifest_to_subtensor_success(self): + manifest = {ExecutorClass.always_on__gpu_24gb: 3} + wallet = Mock() + subtensor = Mock() + subtensor.commit = Mock(return_value=True) + netuid = 49 + + result = commit_manifest_to_subtensor(manifest, wallet, subtensor, netuid) + + assert result is True + subtensor.commit.assert_called_once_with(wallet, netuid, "always_on.gpu-24gb=3") + + def test_commit_manifest_to_subtensor_failure(self): + manifest = {ExecutorClass.always_on__gpu_24gb: 3} + wallet = Mock() + subtensor = Mock() + subtensor.commit = Mock(return_value=False) + netuid = 49 + + result = commit_manifest_to_subtensor(manifest, wallet, subtensor, netuid) + + assert result is False + + def test_commit_manifest_to_subtensor_exception(self): + manifest = {ExecutorClass.always_on__gpu_24gb: 3} + wallet = Mock() + subtensor = Mock() + subtensor.commit = Mock(side_effect=Exception("Network error")) + netuid = 49 + + result = commit_manifest_to_subtensor(manifest, wallet, subtensor, netuid) + + assert result is False + + def test_commit_manifest_to_subtensor_empty_manifest(self): + # Empty manifest should be allowed (for pausing) + manifest = {} + wallet = Mock() + subtensor = Mock() + subtensor.commit = Mock(return_value=True) + netuid = 49 + + result = commit_manifest_to_subtensor(manifest, wallet, subtensor, netuid) + + assert result is True + subtensor.commit.assert_called_once_with(wallet, netuid, "") + + def test_commit_manifest_to_subtensor_too_long(self): + # Create a manifest that would be too long (exceeds 128 chars) + manifest = { + ExecutorClass.always_on__gpu_24gb: 999999999999999999999999999999999999999999999999, + ExecutorClass.always_on__llm__a6000: 999999999999999999999999999999999999999999999999, + ExecutorClass.spin_up_4min__gpu_24gb: 999999999999999999999999999999999999999999999999, + } + wallet = Mock() + subtensor = Mock() + netuid = 49 + + result = commit_manifest_to_subtensor(manifest, wallet, subtensor, netuid) + + assert result is False + subtensor.commit.assert_not_called() + + def test_commit_manifest_to_subtensor_multiple_classes(self): + manifest = { + ExecutorClass.always_on__gpu_24gb: 3, + ExecutorClass.spin_up_4min__gpu_24gb: 2, + } + wallet = Mock() + subtensor = Mock() + subtensor.commit = Mock(return_value=True) + netuid = 49 + + result = commit_manifest_to_subtensor(manifest, wallet, subtensor, netuid) + + assert result is True + expected_commitment = "always_on.gpu-24gb=3;spin_up-4min.gpu-24gb=2" + subtensor.commit.assert_called_once_with(wallet, netuid, expected_commitment) + + +class TestRoundTrip: + """Test that format and parse are inverse operations""" + + def test_round_trip_single_class(self): + original = {ExecutorClass.always_on__gpu_24gb: 3} + commitment = format_manifest_commitment(original) + parsed = parse_commitment_string(commitment) + assert original == parsed + + def test_round_trip_multiple_classes(self): + original = { + ExecutorClass.always_on__gpu_24gb: 3, + ExecutorClass.spin_up_4min__gpu_24gb: 2, + ExecutorClass.always_on__llm__a6000: 1, + } + commitment = format_manifest_commitment(original) + parsed = parse_commitment_string(commitment) + assert original == parsed + + def test_round_trip_empty(self): + original = {} + commitment = format_manifest_commitment(original) + parsed = parse_commitment_string(commitment) + assert original == parsed + + +@pytest.mark.django_db +class TestCommitManifestToChainTask: + """Test the celery task""" + + @patch("compute_horde_miner.miner.tasks.commit_manifest_to_subtensor") + @patch("compute_horde_miner.miner.tasks.async_to_sync") + @patch("compute_horde_miner.miner.tasks.current") + @patch("compute_horde_miner.miner.tasks.settings") + @patch("compute_horde_miner.miner.tasks.bittensor") + def test_commit_manifest_to_chain_task_manifest_changed( + self, + mock_bittensor, + mock_settings, + mock_current, + mock_async_to_sync, + mock_commit, + ): + from compute_horde_miner.miner.tasks import commit_manifest_to_chain + + manifest = {ExecutorClass.always_on__gpu_24gb: 3} + mock_async_to_sync.side_effect = lambda f: lambda: manifest + + mock_wallet = Mock() + mock_wallet.hotkey.ss58_address = "test_hotkey" + mock_settings.BITTENSOR_WALLET.return_value = mock_wallet + mock_settings.BITTENSOR_NETWORK = "test" + mock_settings.BITTENSOR_NETUID = 49 + + mock_subtensor_instance = Mock() + mock_subtensor_instance.get_commitment.return_value = "always_on.gpu-24gb=2" + mock_bittensor.subtensor.return_value = mock_subtensor_instance + + mock_commit.return_value = True + + # Run task + commit_manifest_to_chain() + + # Verify commit was called + assert mock_commit.called + + @patch("compute_horde_miner.miner.tasks.commit_manifest_to_subtensor") + @patch("compute_horde_miner.miner.tasks.async_to_sync") + @patch("compute_horde_miner.miner.tasks.current") + @patch("compute_horde_miner.miner.tasks.settings") + @patch("compute_horde_miner.miner.tasks.bittensor") + def test_commit_manifest_to_chain_task_empty_manifest( + self, mock_bittensor, mock_settings, mock_current, mock_async_to_sync, mock_commit + ): + from compute_horde_miner.miner.tasks import commit_manifest_to_chain + + # Empty manifest should be allowed (for pausing) + manifest = {} + mock_async_to_sync.side_effect = lambda f: lambda: manifest + + mock_wallet = Mock() + mock_wallet.hotkey.ss58_address = "test_hotkey" + mock_settings.BITTENSOR_WALLET.return_value = mock_wallet + mock_settings.BITTENSOR_NETWORK = "test" + mock_settings.BITTENSOR_NETUID = 49 + + mock_subtensor_instance = Mock() + # Chain has a non-empty manifest, we want to commit empty + mock_subtensor_instance.get_commitment.return_value = "always_on.gpu-24gb=3" + mock_bittensor.subtensor.return_value = mock_subtensor_instance + + mock_commit.return_value = True + + # Run task - should commit empty manifest + commit_manifest_to_chain() + + # Should commit empty manifest + assert mock_commit.called + mock_commit.assert_called_once_with(manifest, mock_wallet, mock_subtensor_instance, 49) + + @patch("compute_horde_miner.miner.tasks.commit_manifest_to_subtensor") + @patch("compute_horde_miner.miner.tasks.async_to_sync") + @patch("compute_horde_miner.miner.tasks.current") + @patch("compute_horde_miner.miner.tasks.settings") + @patch("compute_horde_miner.miner.tasks.bittensor") + @patch("compute_horde_miner.miner.tasks.logger") + def test_commit_manifest_to_chain_task_unchanged_manifest( + self, + mock_logger, + mock_bittensor, + mock_settings, + mock_current, + mock_async_to_sync, + mock_commit, + ): + from compute_horde_miner.miner.tasks import commit_manifest_to_chain + + manifest = {ExecutorClass.always_on__gpu_24gb: 3} + mock_async_to_sync.side_effect = lambda f: lambda: manifest + + mock_wallet = Mock() + mock_wallet.hotkey.ss58_address = "test_hotkey" + mock_settings.BITTENSOR_WALLET.return_value = mock_wallet + mock_settings.BITTENSOR_NETWORK = "test" + mock_settings.BITTENSOR_NETUID = 49 + + mock_subtensor_instance = Mock() + # Same as current manifest + mock_subtensor_instance.get_commitment.return_value = "always_on.gpu-24gb=3" + mock_bittensor.subtensor.return_value = mock_subtensor_instance + + # Run task + commit_manifest_to_chain() + + # Should not call commit + assert not mock_commit.called + # Should log that manifest is unchanged + assert any("unchanged" in str(call).lower() for call in mock_logger.debug.call_args_list) + + @patch("compute_horde_miner.miner.tasks.async_to_sync") + @patch("compute_horde_miner.miner.tasks.current") + @patch("compute_horde_miner.miner.tasks.settings") + @patch("compute_horde_miner.miner.tasks.bittensor") + @patch("compute_horde_miner.miner.tasks.logger") + def test_commit_manifest_to_chain_task_exception_handling( + self, + mock_logger, + mock_bittensor, + mock_settings, + mock_current, + mock_async_to_sync, + ): + from compute_horde_miner.miner.tasks import commit_manifest_to_chain + + # Make async_to_sync raise an exception + def raise_error(): + raise Exception("Test error") + + mock_async_to_sync.side_effect = lambda f: raise_error + + # Run task - should not raise exception + commit_manifest_to_chain() + + # Should log error + assert mock_logger.error.called diff --git a/miner/app/src/compute_horde_miner/settings.py b/miner/app/src/compute_horde_miner/settings.py index b1317064d..1b15a935e 100644 --- a/miner/app/src/compute_horde_miner/settings.py +++ b/miner/app/src/compute_horde_miner/settings.py @@ -329,6 +329,13 @@ def wrapped(*args, **kwargs): "schedule": timedelta(minutes=1), "options": {}, }, + "commit_manifest_to_chain": { + "task": "compute_horde_miner.miner.tasks.commit_manifest_to_chain", + "schedule": timedelta(minutes=5), + "options": { + "expires": timedelta(minutes=5).total_seconds(), + }, + }, } if IS_LOCAL_MINER: diff --git a/miner/app/src/pytest.ini b/miner/app/src/pytest.ini index 55069d11d..3df2f28c8 100644 --- a/miner/app/src/pytest.ini +++ b/miner/app/src/pytest.ini @@ -7,4 +7,4 @@ filterwarnings = error default::DeprecationWarning default:Error when trying to teardown test databases -asyncio_default_fixture_loop_scope = function +asyncio_mode = auto diff --git a/validator/app/src/compute_horde_validator/settings.py b/validator/app/src/compute_horde_validator/settings.py index 9b54853aa..8a4056861 100644 --- a/validator/app/src/compute_horde_validator/settings.py +++ b/validator/app/src/compute_horde_validator/settings.py @@ -437,6 +437,11 @@ def wrapped(*args, **kwargs): "Additional extension of the allowed block range when validating spendings (upper bound)", int, ), + "DYNAMIC_USE_MANIFEST_COMMITMENTS": ( + False, + "Read manifests from Bittensor knowledge commitments instead of WebSocket connections to miners", + bool, + ), } # Content Security Policy diff --git a/validator/app/src/compute_horde_validator/validator/allowance/utils/manifests.py b/validator/app/src/compute_horde_validator/validator/allowance/utils/manifests.py index acae1d17f..eb2afee45 100644 --- a/validator/app/src/compute_horde_validator/validator/allowance/utils/manifests.py +++ b/validator/app/src/compute_horde_validator/validator/allowance/utils/manifests.py @@ -5,8 +5,11 @@ import operator from functools import reduce +from compute_horde.manifest_utils import parse_commitment_string from compute_horde.miner_client.organic import OrganicMinerClient from compute_horde_core.executor_class import ExecutorClass +from constance import config +from django.conf import settings as django_settings from django.db import transaction from django.db.models import Min, Q @@ -150,20 +153,78 @@ def event_loop(): return loop +def fetch_manifests_from_commitments( + hotkeys: list[ss58_address], +) -> dict[tuple[ss58_address, ExecutorClass], int]: + """ + Fetch manifests from Bittensor knowledge commitments. + + Only includes results for miners that have commitments on chain. + """ + subtensor_instance = supertensor().subtensor() + netuid = django_settings.BITTENSOR_NETUID + + result = {} + for hotkey in hotkeys: + try: + # Get commitment from chain + commitment = subtensor_instance.get_commitment( + netuid=netuid, + hotkey=hotkey, + ) + + if commitment: + # Parse commitment string to manifest dict + manifest = parse_commitment_string(commitment) + + # Add all executor classes from manifest to result + for executor_class, count in manifest.items(): + result[(hotkey, executor_class)] = count + + logger.debug(f"Fetched commitment for {hotkey}: {commitment}") + else: + logger.debug(f"No commitment found for {hotkey}") + + except Exception as e: + logger.warning(f"Failed to get commitment for {hotkey}: {e}") + continue + + logger.info(f"Fetched {len(result)} manifest entries from {len(hotkeys)} hotkeys via commitments") + return result + + def sync_manifests(): block = supertensor().get_current_block() neurons = supertensor().get_shielded_neurons() max_executors_per_class = get_miner_max_executors_per_class_sync() - miners = [ - ( - n.hotkey, - getattr(n.axon_info, "shield_address", str(n.axon_info.ip)), - n.axon_info.port, - ) - for n in neurons - if n.axon_info.port - ] - new_manifests = event_loop().run_until_complete(fetch_manifests_from_miners(miners)) + + # Check if we should read from knowledge commitments or via WebSocket + if config.DYNAMIC_USE_MANIFEST_COMMITMENTS: + logger.info("Fetching manifests from Bittensor knowledge commitments") + hotkeys = [n.hotkey for n in neurons] + new_manifests = fetch_manifests_from_commitments(hotkeys) + # For commitments, we don't have miner addresses, so we create a minimal list + miners = [ + ( + n.hotkey, + getattr(n.axon_info, "shield_address", str(n.axon_info.ip)), + n.axon_info.port, + ) + for n in neurons + if n.axon_info.port + ] + else: + logger.info("Fetching manifests from miners via WebSocket") + miners = [ + ( + n.hotkey, + getattr(n.axon_info, "shield_address", str(n.axon_info.ip)), + n.axon_info.port, + ) + for n in neurons + if n.axon_info.port + ] + new_manifests = event_loop().run_until_complete(fetch_manifests_from_miners(miners)) with transaction.atomic(): with Lock(LockType.ALLOWANCE_BLOCK_INJECTION, 10.0): # This will throw an error if the lock cannot be obtained in 10.0s and that's correct