diff --git a/compute_horde/compute_horde/executor_class.py b/compute_horde/compute_horde/executor_class.py index 8e0ebc5f1..7367ab2f4 100644 --- a/compute_horde/compute_horde/executor_class.py +++ b/compute_horde/compute_horde/executor_class.py @@ -44,6 +44,10 @@ class ExecutorClassSpec: # TODO: TIMEOUTS - this should depend on the requested job timing instead, but capped at seconds left in current cycle MAX_EXECUTOR_TIMEOUT = timedelta(minutes=20).total_seconds() +# Executor classes considered to be the one used for LLM-type jobs +LLM_EXECUTOR_CLASSES = { ExecutorClass.always_on__llm__a6000 } + DEFAULT_EXECUTOR_CLASS = ExecutorClass.spin_up_4min__gpu_24gb DEFAULT_LLM_EXECUTOR_CLASS = ExecutorClass.always_on__llm__a6000 DEFAULT_EXECUTOR_TIMEOUT = EXECUTOR_CLASS[DEFAULT_EXECUTOR_CLASS].spin_up_time + diff --git a/validator/app/src/compute_horde_validator/settings.py b/validator/app/src/compute_horde_validator/settings.py index 386c34fb4..49d23de12 100644 --- a/validator/app/src/compute_horde_validator/settings.py +++ b/validator/app/src/compute_horde_validator/settings.py @@ -244,7 +244,7 @@ def wrapped(*args, **kwargs): int, ), # llama params - "DYNAMIC_MAX_PROMPT_SERIES": ( + "DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS": ( 3500, "Maximum number of prompt series upon which the prompt generator will not be triggered", int, @@ -254,22 +254,11 @@ def wrapped(*args, **kwargs): "how many prompt samples to generate (should be larger than how many prompts series we use per synthetic run)", int, ), - "DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD": ( - 240, - "how many prompts to answer in a single workload", - int, - ), - # prompt generation params "DYNAMIC_PROMPTS_SERIES_IN_A_SINGLE_GENERATION": ( 25, "Number of batches that prompt generator will process in a single go", int, ), - "DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES": ( - 240, - "Number of prompts to generate in a single series", - int, - ), # prompts answering params "DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES": ( 1, diff --git a/validator/app/src/compute_horde_validator/validator/cross_validation/generator/base.py b/validator/app/src/compute_horde_validator/validator/cross_validation/generator/base.py index 0a04e1826..aaa155627 100644 --- a/validator/app/src/compute_horde_validator/validator/cross_validation/generator/base.py +++ b/validator/app/src/compute_horde_validator/validator/cross_validation/generator/base.py @@ -6,6 +6,7 @@ from compute_horde_core.executor_class import ExecutorClass from compute_horde_core.output_upload import OutputUpload from compute_horde_core.volume import Volume +from compute_horde_validator.validator.generation_profile import PromptGenerationProfile, PROMPT_GENERATION_PROFILES class BasePromptJobGenerator(abc.ABC): @@ -13,12 +14,12 @@ def __init__( self, _uuid: uuid.UUID, *, - num_prompts_per_batch: int, + profile: PromptGenerationProfile, batch_uuids: list[uuid.UUID], upload_urls: list[str], ) -> None: self._uuid = _uuid - self.num_prompts_per_batch = num_prompts_per_batch + self.profile = profile self.batch_uuids = batch_uuids self.upload_urls = upload_urls @@ -33,6 +34,10 @@ def docker_image_name(self) -> str: ... @abc.abstractmethod def executor_class(self) -> ExecutorClass: ... + """ + This is the executor class to run the generation job. + It need not be related to the actual generation profile + """ def docker_run_options_preset(self) -> DockerRunOptionsPreset: return "nvidia_all" @@ -46,6 +51,9 @@ def volume(self) -> Volume | None: def output(self) -> OutputUpload | None: return None + def num_prompts_per_batch(self) -> int: + return PROMPT_GENERATION_PROFILES[self.profile].num_prompts + def get_job_details(self) -> OrganicJobDetails: return OrganicJobDetails( job_uuid=str(self._uuid), diff --git a/validator/app/src/compute_horde_validator/validator/cross_validation/generator/v0/__init__.py b/validator/app/src/compute_horde_validator/validator/cross_validation/generator/v0/__init__.py index 9406fa7f9..46321edef 100644 --- a/validator/app/src/compute_horde_validator/validator/cross_validation/generator/v0/__init__.py +++ b/validator/app/src/compute_horde_validator/validator/cross_validation/generator/v0/__init__.py @@ -11,6 +11,8 @@ class PromptJobGenerator(BasePromptJobGenerator): + EXTRA_PROMPTS_TO_ACCOUNT_FOR_SOME_INVALID_ONES = 10 + def generator_version(self) -> int: return 0 @@ -21,6 +23,9 @@ def docker_image_name(self) -> str: return f"backenddevelopersltd/compute-horde-prompt-gen-{settings.PROMPT_GENERATION_MODEL}:v0-latest" def executor_class(self) -> ExecutorClass: + # Currently we don't care on which executor we generate the prompts + # However, if we wanted we could pick one of the executor classes that's should be used for solving + # the prompts according to the generation profile return ExecutorClass.always_on__llm__a6000 def docker_run_cmd(self) -> list[str]: @@ -28,11 +33,12 @@ def docker_run_cmd(self) -> list[str]: "--quantize", "--model_name", settings.PROMPT_GENERATION_MODEL, - "--batch_size=250", # on A6000 we want 240 prompts generated in single file, but not all results are valid + "--batch_size", + str(self.num_prompts_per_batch() + PromptJobGenerator.EXTRA_PROMPTS_TO_ACCOUNT_FOR_SOME_INVALID_ONES), "--num_return_sequences=1", "--max_new_tokens=40", # 40 new tokens is enough for reasonable length prompt - 30 caused too much cut off prompts "--number_of_prompts_per_batch", - str(self.num_prompts_per_batch), + str(self.num_prompts_per_batch()), "--uuids", str(",".join(map(str, self.batch_uuids))), ] diff --git a/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_answering.py b/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_answering.py index 111553b36..366c8e12e 100644 --- a/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_answering.py +++ b/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_answering.py @@ -57,20 +57,17 @@ async def answer_prompts( job_generator = LlmPromptsJobGenerator(workload.s3_url, seed) await job_generator.ainit(miner_hotkey=TRUSTED_MINER_FAKE_KEY) - # TODO: Should be generated for all the llm executor classes. - # SolveWorkload/PromptSample should have a executor_class field saying which - # executor_class this sample is for. job_uuid = job_uuid or uuid.uuid4() job_details = OrganicJobDetails( job_uuid=str(job_uuid), - executor_class=ExecutorClass.always_on__llm__a6000, + executor_class=workload.executor_class, docker_image=job_generator.docker_image_name(), docker_run_options_preset=job_generator.docker_run_options_preset(), docker_run_cmd=job_generator.docker_run_cmd(), total_job_timeout=( job_generator.timeout_seconds() + max( - EXECUTOR_CLASS[ExecutorClass.always_on__llm__a6000].spin_up_time, + EXECUTOR_CLASS[workload.executor_class].spin_up_time, MIN_SPIN_UP_TIME, ) ), diff --git a/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_generation.py b/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_generation.py index 3a5f86dda..47e84ee3a 100644 --- a/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_generation.py +++ b/validator/app/src/compute_horde_validator/validator/cross_validation/prompt_generation.py @@ -15,6 +15,7 @@ TrustedMinerClient, ) from compute_horde_validator.validator.dynamic_config import aget_config +from compute_horde_validator.validator.generation_profile import PromptGenerationProfile from compute_horde_validator.validator.models import PromptSeries, SystemEvent from compute_horde_validator.validator.s3 import generate_upload_url, get_public_url @@ -26,6 +27,7 @@ async def generate_prompts( *, + profile: PromptGenerationProfile, create_miner_client: Callable[..., TrustedMinerClient] | None = None, job_uuid: uuid.UUID | None = None, wait_timeout: int | None = None, @@ -45,13 +47,12 @@ async def generate_prompts( job_uuid = job_uuid or uuid.uuid4() num_batches = await aget_config("DYNAMIC_PROMPTS_SERIES_IN_A_SINGLE_GENERATION") - num_prompts_per_batch = await aget_config("DYNAMIC_NUMBER_OF_PROMPTS_IN_SERIES") series_uuids, upload_urls, public_urls = _generate_uuids_and_urls(num_batches) job_generator = prompt_job_generator( job_uuid, - num_prompts_per_batch=num_prompts_per_batch, + profile=profile, batch_uuids=series_uuids, upload_urls=upload_urls, ) @@ -86,7 +87,9 @@ async def generate_prompts( type=SystemEvent.EventType.LLM_PROMPT_GENERATION, subtype=SystemEvent.EventSubType.FAILURE, long_description=f"Trusted miner failed to run prompt generation job: {e!r}", - data={}, + data={ + "profile": profile, + }, ) logger.warning("Failed to run organic job", exc_info=True) return @@ -103,6 +106,7 @@ async def generate_prompts( "completed_at": completed_at.isoformat(), "duration": (completed_at - started_at).total_seconds(), "count": len(series_uuids), + "profile": profile, }, ) diff --git a/validator/app/src/compute_horde_validator/validator/generation_profile.py b/validator/app/src/compute_horde_validator/validator/generation_profile.py new file mode 100644 index 000000000..5d9ee03c8 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/generation_profile.py @@ -0,0 +1,35 @@ +from dataclasses import dataclass +from enum import StrEnum + +from compute_horde_core.executor_class import ExecutorClass +from compute_horde_validator.settings import PROMPT_JOB_GENERATOR + + +class PromptGenerationProfile(StrEnum): + default_a6000 = "default_a6000" + default_a100 = "default_a100" + + +@dataclass +class GenerationProfileSpec: + description: str + num_prompts: int + + +PROMPT_GENERATION_PROFILES = { + PromptGenerationProfile.default_a6000: GenerationProfileSpec( + description="The default generation profile for a6000 cards", + num_prompts=240 + ), + # this is a currently unused in production, fake generation profile + # that's used for testing the multi-hw support + PromptGenerationProfile.default_a100: GenerationProfileSpec( + description="The default generation profile for a100 cards", + num_prompts=777 + ) +} + + +EXECUTOR_TO_PROMPT_GENERATION_PROFILE = { + ExecutorClass.always_on__llm__a6000: PromptGenerationProfile.default_a6000, +} diff --git a/validator/app/src/compute_horde_validator/validator/migrations/0061_promptseries_generation_profile_and_more.py b/validator/app/src/compute_horde_validator/validator/migrations/0061_promptseries_generation_profile_and_more.py new file mode 100644 index 000000000..e036f7271 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/migrations/0061_promptseries_generation_profile_and_more.py @@ -0,0 +1,25 @@ +# Generated by Django 4.2.19 on 2025-03-31 20:35 + +import compute_horde_core.executor_class +import compute_horde_validator.validator.generation_profile +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('validator', '0060_remove_adminjobrequest_raw_script'), + ] + + operations = [ + migrations.AddField( + model_name='promptseries', + name='generation_profile', + field=models.CharField(choices=[(compute_horde_validator.validator.generation_profile.PromptGenerationProfile['default_a6000'], compute_horde_validator.validator.generation_profile.PromptGenerationProfile['default_a6000']), (compute_horde_validator.validator.generation_profile.PromptGenerationProfile['default_a100'], compute_horde_validator.validator.generation_profile.PromptGenerationProfile['default_a100'])], default=compute_horde_validator.validator.generation_profile.PromptGenerationProfile['default_a6000'], max_length=255), + ), + migrations.AddField( + model_name='solveworkload', + name='executor_class', + field=models.CharField(choices=[(compute_horde_core.executor_class.ExecutorClass['spin_up_4min__gpu_24gb'], compute_horde_core.executor_class.ExecutorClass['spin_up_4min__gpu_24gb']), (compute_horde_core.executor_class.ExecutorClass['always_on__gpu_24gb'], compute_horde_core.executor_class.ExecutorClass['always_on__gpu_24gb']), (compute_horde_core.executor_class.ExecutorClass['always_on__llm__a6000'], compute_horde_core.executor_class.ExecutorClass['always_on__llm__a6000'])], default=compute_horde_core.executor_class.ExecutorClass['always_on__llm__a6000'], max_length=255), + ), + ] diff --git a/validator/app/src/compute_horde_validator/validator/models.py b/validator/app/src/compute_horde_validator/validator/models.py index 48365d550..cf5441d5e 100644 --- a/validator/app/src/compute_horde_validator/validator/models.py +++ b/validator/app/src/compute_horde_validator/validator/models.py @@ -7,8 +7,10 @@ from os import urandom from typing import Self +from .generation_profile import PromptGenerationProfile from asgiref.sync import sync_to_async from compute_horde.executor_class import DEFAULT_EXECUTOR_CLASS +from compute_horde_core.executor_class import ExecutorClass from compute_horde.subtensor import get_cycle_containing_block from compute_horde.utils import MIN_VALIDATOR_STAKE from compute_horde_core.output_upload import OutputUpload, ZipAndHttpPutUpload @@ -511,6 +513,11 @@ class PromptSeries(models.Model): s3_url = models.URLField(max_length=1000) created_at = models.DateTimeField(default=now) generator_version = models.PositiveSmallIntegerField() + generation_profile = models.CharField( + max_length=255, + choices=[(profile, profile) for profile in PromptGenerationProfile], + default=PromptGenerationProfile.default_a6000 + ) class SolveWorkload(models.Model): @@ -523,6 +530,11 @@ class SolveWorkload(models.Model): s3_url = models.URLField(max_length=1000) created_at = models.DateTimeField(default=now) finished_at = models.DateTimeField(null=True, default=None, db_index=True) + executor_class = models.CharField( + max_length=255, + choices=[(cls, cls) for cls in ExecutorClass], + default=ExecutorClass.always_on__llm__a6000 + ) def __str__(self): return f"uuid: {self.workload_uuid} - seed: {self.seed}" diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py index 60ba3ec1c..eb8a16f6a 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/batch_run.py @@ -17,6 +17,7 @@ from asgiref.sync import sync_to_async from channels.layers import get_channel_layer from compute_horde.executor_class import EXECUTOR_CLASS +from compute_horde.executor_class import LLM_EXECUTOR_CLASSES from compute_horde.miner_client.base import ( AbstractMinerClient, UnsupportedMessageReceived, @@ -73,6 +74,7 @@ get_streaming_job_executor_classes, get_system_event_limits, ) +from compute_horde_validator.validator.generation_profile import EXECUTOR_TO_PROMPT_GENERATION_PROFILE from compute_horde_validator.validator.models import ( Miner, MinerManifest, @@ -130,9 +132,6 @@ SYNTHETIC_JOBS_SOFT_LIMIT = 20 * 60 SYNTHETIC_JOBS_HARD_LIMIT = SYNTHETIC_JOBS_SOFT_LIMIT + 10 -# Executor class considered to be the one used for LLM-type jobs -LLM_EXECUTOR_CLASS = ExecutorClass.always_on__llm__a6000 - class MinerClient(AbstractMinerClient[MinerToValidatorMessage, ValidatorToMinerMessage]): def __init__( @@ -646,8 +645,12 @@ def emit_telemetry_event(self) -> SystemEvent | None: average_job_send_time=_timedelta_dump(self.average_job_send_time), counts=counts, llm_counts={ - "llm_executor_count": self.get_executor_count(LLM_EXECUTOR_CLASS), - **calculate_llm_prompt_sample_counts(), + "llm_executor_count": sum(self.get_executor_count_per_class(LLM_EXECUTOR_CLASSES).values()), + **dict(sum( + (Counter(calculate_llm_prompt_sample_counts(executor_class)) + for executor_class in LLM_EXECUTOR_CLASSES), + Counter() + )), }, manifests=self.manifests, loop_profiling=self.loop_profiler.get() if self.loop_profiler is not None else None, @@ -659,16 +662,16 @@ def emit_telemetry_event(self) -> SystemEvent | None: data=data, ) - def get_executor_count(self, executor_class: ExecutorClass) -> int: + def get_executor_count_per_class(self, executor_classes: set[ExecutorClass]) -> dict[ExecutorClass, int]: """ - Calculate the total count of executors of given class. + Calculate the count of executors for given classes. """ - return sum( - count - for executors in self.executors.values() - for _executor_class, count in executors.items() - if _executor_class == executor_class - ) + result = defaultdict(int) + for executors in self.executors.values(): + for executor_class, count in executors.items(): + if executor_class in executor_classes: + result[executor_class] += count + return dict(result) def _get_job_count(self, executor_class: ExecutorClass | None) -> dict[str, int]: jobs = list(self.jobs.values()) @@ -1025,14 +1028,14 @@ async def _close_client(ctx: BatchContext, miner_hotkey: str) -> None: await client.close() -def calculate_llm_prompt_sample_counts() -> dict[str, int]: +def calculate_llm_prompt_sample_counts(executor_class: ExecutorClass) -> dict[str, int]: """ Calculate counts of LLM jobs, grouped by whether they are used and/or answered. """ - prompt_series_total_count = PromptSeries.objects.count() + prompt_series_total_count = PromptSeries.objects.filter(generation_profile=EXECUTOR_TO_PROMPT_GENERATION_PROFILE[executor_class]).count() prompt_sample_types = ( - PromptSample.objects.annotate( + PromptSample.objects.filter(workload__executor_class=executor_class).annotate( is_used=ExpressionWrapper( Q(synthetic_job_id__isnull=False), output_field=BooleanField(), @@ -1071,6 +1074,9 @@ def calculate_llm_prompt_sample_counts() -> dict[str, int]: @sync_to_async def _not_enough_prompts_system_event( ctx: BatchContext, + executor_class: ExecutorClass, + prompt_samples_count: int, + llm_executor_count: int, ) -> None: if cache.get("insufficient_prompts_telemetry_sent"): logger.warning("skipping INSUFFICIENT_PROMPTS system event, already exists in 24h") @@ -1082,34 +1088,40 @@ def _not_enough_prompts_system_event( description="not enough prompt samples available in database", func="get_llm_prompt_samples", data={ - "llm_executor_count": ctx.get_executor_count(LLM_EXECUTOR_CLASS), - **calculate_llm_prompt_sample_counts(), + "llm_executor_count": llm_executor_count, + "prompt_samples_count": prompt_samples_count, + **calculate_llm_prompt_sample_counts(executor_class), }, ) cache.set("insufficient_prompts_telemetry_sent", True, timeout=24 * 60 * 60) -async def get_llm_prompt_samples(ctx: BatchContext) -> list[PromptSample] | None: +async def get_llm_prompt_samples(ctx: BatchContext) -> dict[ExecutorClass, list[PromptSample]]: # TODO: refactor into nicer abstraction - llm_executor_count = ctx.get_executor_count(LLM_EXECUTOR_CLASS) - prompt_samples_qs = ( - PromptSample.objects.select_related("series", "workload") - .prefetch_related("prompts") - .filter( - synthetic_job__isnull=True, - workload__finished_at__isnull=False, - )[:llm_executor_count] - ) - prompt_samples = [ps async for ps in prompt_samples_qs] - if len(prompt_samples) < llm_executor_count: - await _not_enough_prompts_system_event(ctx) - logger.warning( - "Not enough prompt samples for llm executors: %d < %d - will NOT run llm synthetic prompt jobs", - len(prompt_samples), - llm_executor_count, + prompt_samples_per_class: dict[ExecutorClass, list[PromptSample]] = {} + for executor_class in LLM_EXECUTOR_CLASSES: + llm_executor_count = ctx.get_executor_count_per_class({executor_class})[executor_class] + prompt_samples_qs = ( + PromptSample.objects.select_related("series", "workload") + .prefetch_related("prompts") + .filter( + synthetic_job__isnull=True, + workload__finished_at__isnull=False, + workload__executor_class=executor_class, + )[:llm_executor_count] ) - return None - return prompt_samples + prompt_samples = [ps async for ps in prompt_samples_qs] + if len(prompt_samples) < llm_executor_count: + await _not_enough_prompts_system_event(ctx, executor_class, len(prompt_samples), llm_executor_count) + logger.warning( + "Not enough prompt samples for llm executor class %s: %d < %d - will NOT run llm synthetic prompt jobs", + str(executor_class), + len(prompt_samples), + llm_executor_count, + ) + else: + prompt_samples_per_class[executor_class] = prompt_samples + return prompt_samples_per_class async def _generate_jobs(ctx: BatchContext) -> None: @@ -1117,8 +1129,9 @@ async def _generate_jobs(ctx: BatchContext) -> None: start_time = time.time() generated_job_count = 0 - prompt_samples = await get_llm_prompt_samples(ctx) - prompt_samples_iter = iter(prompt_samples) if prompt_samples is not None else None + prompt_samples_per_class = await get_llm_prompt_samples(ctx) + prompt_samples_iter = {executor_class: iter(prompt_samples) + for executor_class, prompt_samples in prompt_samples_per_class.items()} for hotkey, executors in ctx.executors.items(): miner_name = ctx.names[hotkey] @@ -1126,16 +1139,18 @@ async def _generate_jobs(ctx: BatchContext) -> None: job_generators = [] for _ in range(count): kwargs = {} - if executor_class == LLM_EXECUTOR_CLASS: - if prompt_samples_iter is None: - logger.warning("No llm prompt samples available, skipping llm job") + if executor_class in LLM_EXECUTOR_CLASSES: + logger.info(f"Generating llm job for executor class {executor_class}") + if executor_class not in prompt_samples_iter: + logger.warning(f"No llm prompt samples available for executor class {executor_class}, skipping llm job") continue - prompt_sample = next(prompt_samples_iter, None) + assert prompt_samples_iter[executor_class] is not None, f"prompt samples iterator for executor class {executor_class} is None?" + prompt_sample = next(prompt_samples_iter[executor_class], None) if prompt_sample is None: # it means that there is some bug - we want to see it in sentry # and continue, so other executor classes are not affected logger.error( - "Dried prompt_samples_iter, this should not happen, skipping llm job" + f"Dried prompt_samples_iter for executor class {executor_class}, this should not happen, skipping llm job" ) continue kwargs = { @@ -1147,6 +1162,8 @@ async def _generate_jobs(ctx: BatchContext) -> None: # enable streaming for specific llm jobs executor classes if executor_class in streaming_classes: kwargs["streaming"] = True + else: + logger.info(f"Generating non-llm job for executor class {executor_class}") job_generator = await current.synthetic_job_generator_factory.create( executor_class, **kwargs @@ -2034,7 +2051,7 @@ async def _download_llm_prompts_answers(ctx: BatchContext) -> None: for job in ctx.jobs.values(): if ( - job.executor_class == LLM_EXECUTOR_CLASS + job.executor_class in LLM_EXECUTOR_CLASSES and isinstance(job.job_generator, LlmPromptsSyntheticJobGenerator) and isinstance(job.job_response, V0JobFinishedRequest) ): @@ -2186,7 +2203,7 @@ def _db_persist(ctx: BatchContext) -> None: prompt_samples: list[PromptSample] = [] for job in ctx.jobs.values(): - if job.executor_class != LLM_EXECUTOR_CLASS: + if job.executor_class not in LLM_EXECUTOR_CLASSES: continue if not isinstance(job.job_generator, LlmPromptsSyntheticJobGenerator): logger.warning(f"Skipped non-LLM job: {job.job_generator.__class__.__name__}") diff --git a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/factory.py b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/factory.py index 81827afe0..09119909b 100644 --- a/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/factory.py +++ b/validator/app/src/compute_horde_validator/validator/synthetic_jobs/generator/factory.py @@ -1,4 +1,5 @@ from compute_horde_core.executor_class import ExecutorClass +from compute_horde.executor_class import LLM_EXECUTOR_CLASSES from compute_horde_validator.validator.synthetic_jobs.generator.base import ( BaseSyntheticJobGenerator, @@ -14,6 +15,6 @@ class DefaultSyntheticJobGeneratorFactory(BaseSyntheticJobGeneratorFactory): async def create(self, executor_class: ExecutorClass, **kwargs) -> BaseSyntheticJobGenerator: - if executor_class == ExecutorClass.always_on__llm__a6000: + if executor_class in LLM_EXECUTOR_CLASSES: return LlmPromptsSyntheticJobGenerator(**kwargs) return GPUHashcatSyntheticJobGenerator(**kwargs) diff --git a/validator/app/src/compute_horde_validator/validator/tasks.py b/validator/app/src/compute_horde_validator/validator/tasks.py index f8c2ea0e6..91fa20373 100644 --- a/validator/app/src/compute_horde_validator/validator/tasks.py +++ b/validator/app/src/compute_horde_validator/validator/tasks.py @@ -27,13 +27,14 @@ from celery import shared_task from celery.result import AsyncResult, allow_join_result from celery.utils.log import get_task_logger +from compute_horde_core.executor_class import ExecutorClass from compute_horde.dynamic_config import sync_dynamic_config from compute_horde.fv_protocol.facilitator_requests import OrganicJobRequest -from compute_horde.miner_client.organic import OrganicMinerClient -from compute_horde.subtensor import TEMPO, get_cycle_containing_block -from compute_horde.transport.base import TransportConnectionError -from compute_horde.utils import MIN_VALIDATOR_STAKE, turbobt_get_validators -from compute_horde_core.executor_class import ExecutorClass +from compute_horde_validator.validator.generation_profile import ( + PromptGenerationProfile, + EXECUTOR_TO_PROMPT_GENERATION_PROFILE, + PROMPT_GENERATION_PROFILES, +) from constance import config from django.conf import settings from django.db import transaction @@ -1711,19 +1712,43 @@ def fetch_dynamic_config() -> None: time_limit=5 * 60, ) def llm_prompt_generation(): - unprocessed_workloads_count = SolveWorkload.objects.filter(finished_at__isnull=True).count() + for profile in PromptGenerationProfile: + llm_prompt_generation_for_profile(profile) + +def llm_prompt_generation_for_profile(profile: PromptGenerationProfile): + logger.info("Generating prompts for profile %s", profile) + # Get executor classes that use this generation profile + executor_classes = [ + executor_class for executor_class, gen_profile + in EXECUTOR_TO_PROMPT_GENERATION_PROFILE.items() + if gen_profile == profile + ] + + if not executor_classes: + logger.info("No executor class uses profile %s; skipping generation", profile) + return + + unprocessed_workloads_count = SolveWorkload.objects.filter( + finished_at__isnull=True, + executor_class__in=executor_classes + ).count() + if unprocessed_workloads_count > 0: # prevent any starvation issues - logger.info("Unprocessed workloads found - skipping prompt generation") + logger.info("Unprocessed workloads found for profile %s - skipping prompt generation", profile) return - num_expected_prompt_series = config.DYNAMIC_MAX_PROMPT_SERIES - num_prompt_series = PromptSeries.objects.count() + num_expected_prompt_series = config.DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS * len(executor_classes) + num_prompt_series = PromptSeries.objects.filter( + generation_profile=profile, + ).count() if num_prompt_series >= num_expected_prompt_series: logger.warning( - "There are %s series in the db - skipping prompt generation", + "There are %s series in the db, more than %d for each of %d executor classes - skipping prompt generation", num_prompt_series, + config.DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS, + len(executor_classes), ) return @@ -1735,6 +1760,7 @@ def llm_prompt_generation(): data={ "prompt_series_count": num_prompt_series, "expected_prompt_series_count": num_expected_prompt_series, + "prompt_generation_profile": profile, }, ) @@ -1746,7 +1772,7 @@ def llm_prompt_generation(): return try: - async_to_sync(generate_prompts)() + async_to_sync(generate_prompts)(profile=profile) except Exception as e: msg = f"Error while generating prompts: {e}" logger.warning(msg) @@ -1754,7 +1780,9 @@ def llm_prompt_generation(): type=SystemEvent.EventType.LLM_PROMPT_GENERATION, subtype=SystemEvent.EventSubType.FAILURE, long_description=msg, - data={}, + data={ + "prompt_generation_profile": profile, + }, ) @@ -1827,8 +1855,8 @@ def llm_prompt_answering(): ) -def init_workload(seed: int) -> tuple[SolveWorkload, str]: - workload_uuid = uuid.uuid4() +def init_workload(seed: int, executor_class: ExecutorClass) -> tuple[SolveWorkload, str]: + workload_uuid = generate_workload_uuid() # generate an s3 url to upload workload prompts to s3_upload_url = generate_upload_url( key=str(workload_uuid), bucket_name=settings.S3_BUCKET_NAME_ANSWERS @@ -1838,21 +1866,35 @@ def init_workload(seed: int) -> tuple[SolveWorkload, str]: key=str(workload_uuid), bucket_name=settings.S3_BUCKET_NAME_ANSWERS, ) - return SolveWorkload(workload_uuid=workload_uuid, seed=seed, s3_url=s3_url), s3_upload_url + return (SolveWorkload(workload_uuid=workload_uuid, seed=seed, s3_url=s3_url, executor_class=executor_class), + s3_upload_url) -@app.task() +def generate_workload_uuid(): + return uuid.uuid4() + + +@app.task def llm_prompt_sampling(): + for executor_class in EXECUTOR_TO_PROMPT_GENERATION_PROFILE: + llm_prompt_sampling_for_executor_class(executor_class) + + +@app.task() +def llm_prompt_sampling_for_executor_class(executor_class: ExecutorClass): # generate new prompt samples if needed - num_prompt_series = PromptSeries.objects.count() + generation_profile = EXECUTOR_TO_PROMPT_GENERATION_PROFILE[executor_class] + num_prompt_series = PromptSeries.objects.filter(generation_profile=generation_profile).count() + # FIXME: I don't understand this math required_series_to_start_sampling = min( - config.DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY * 2, config.DYNAMIC_MAX_PROMPT_SERIES + config.DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY * 2, config.DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS ) if num_prompt_series < required_series_to_start_sampling: logger.warning( - "There are %s series in the db - expected %s for start sampling - skipping prompt sampling", + "There are %s series in the db generated with profile %s - expected %s to start sampling - skipping prompt sampling", num_prompt_series, + generation_profile, required_series_to_start_sampling, ) SystemEvent.objects.create( @@ -1862,19 +1904,26 @@ def llm_prompt_sampling(): data={ "prompt_series_count": num_prompt_series, "required_prompt_series_count_to_start_sampling": required_series_to_start_sampling, + "prompt_generation_profile": generation_profile, }, ) return - num_unused_prompt_samples = PromptSample.objects.filter(synthetic_job__isnull=True).count() + # check unused prompt samples for this executor_class + num_unused_prompt_samples = PromptSample.objects.filter( + workload__executor_class=executor_class, + synthetic_job__isnull=True + ).count() + num_needed_prompt_samples = ( config.DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY - num_unused_prompt_samples ) if num_needed_prompt_samples <= 0: logger.warning( - "There are already %s prompt samples in the db not used in synthetic jobs - skipping prompt sampling", + "There are already %s prompt samples for executor class %s in the db not used in synthetic jobs - skipping prompt sampling", num_unused_prompt_samples, + executor_class, ) SystemEvent.objects.create( type=SystemEvent.EventType.LLM_PROMPT_SAMPLING, @@ -1883,16 +1932,19 @@ def llm_prompt_sampling(): data={ "unused_prompt_samples_count": num_unused_prompt_samples, "target_unused_prompt_samples_count": config.DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY, + "executor_class": executor_class, }, ) return logger.info( - "We need %s more prompt samples in the db for synthetic jobs - generating prompt answering workloads", + "We need %s more prompt samples for the executor class %s in the db for synthetic jobs - generating prompt answering workloads", num_needed_prompt_samples, + executor_class, ) - num_workloads_created = create_sample_workloads(num_needed_prompt_samples) + num_workloads_created = create_sample_workloads(num_needed_prompt_samples, executor_class) + SystemEvent.objects.create( type=SystemEvent.EventType.LLM_PROMPT_SAMPLING, subtype=SystemEvent.EventSubType.NEW_WORKLOADS_CREATED, @@ -1901,6 +1953,7 @@ def llm_prompt_sampling(): "new_workloads_count": num_workloads_created, "prompt_series_count": num_prompt_series, "unused_prompt_samples_count": num_unused_prompt_samples, + "executor_class": executor_class, }, ) @@ -1919,20 +1972,23 @@ def persist_workload( logger.error(f"Failed to create workload {workload}") -def create_sample_workloads(num_needed_prompt_samples: int) -> int: +def create_sample_workloads(num_needed_prompt_samples: int, executor_class: ExecutorClass) -> int: """ Creates enough workloads to cover at least `num_needed_prompt_samples` prompt samples + for the given executor_class Returns the number of workloads created """ prompts_per_sample = config.DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES - prompts_per_workload = config.DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD + generation_profile = EXECUTOR_TO_PROMPT_GENERATION_PROFILE[executor_class] + generation_profile_spec = PROMPT_GENERATION_PROFILES[generation_profile] + prompts_per_workload = generation_profile_spec.num_prompts # set seed for the current synthetic jobs run seed = random.randint(0, MAX_SEED) # workload we are currently sampling for try: - current_workload, current_upload_url = init_workload(seed) + current_workload, current_upload_url = init_workload(seed, executor_class) except Exception as e: logger.error(f"Failed to create new workload: {e} - aborting prompt sampling") return 0 @@ -1945,9 +2001,18 @@ def create_sample_workloads(num_needed_prompt_samples: int) -> int: current_prompt_samples = [] current_prompts = [] - # assume we have sufficient prompt series in the db to make all the prompt_samples needed + # Iterating once over the series is not strictly correct as there's no guarantee that we'll manage + # to create the required number of workloads to sample num_needed_prompt_samples + # However, we consider it good enough, because the next run of llm_prompt_sampling will take care of it. + # Alternatively, we could iterate over the series multiple times, shuffling, until we have enough workloads, + # but then we'd have to worry about not getting into an infinite loop + # take a random order of prompt series to avoid using the same series at each synthetic jobs run - for prompt_series in PromptSeries.objects.order_by("?").all(): + series_for_generator = PromptSeries.objects.filter( + generation_profile=generation_profile + ).order_by("?").all() + + for prompt_series in series_for_generator: # get all prompts try: lines = download_prompts_from_s3_url(prompt_series.s3_url) @@ -1958,12 +2023,16 @@ def create_sample_workloads(num_needed_prompt_samples: int) -> int: type=SystemEvent.EventType.LLM_PROMPT_SAMPLING, subtype=SystemEvent.EventSubType.ERROR_DOWNLOADING_FROM_S3, long_description=msg, + data={ + "prompt_series_id": prompt_series.id, + "s3_url": prompt_series.s3_url, + }, ) continue # should always have enough prompts if len(lines) <= prompts_per_sample: - logger.error(f"Skipping bucket {prompt_series.s3_url}, not enough prompts") + logger.error(f"Skipping bucket {prompt_series.s3_url}, not enough prompts: {len(lines)} <= {prompts_per_sample}") continue # sample prompts @@ -1989,6 +2058,10 @@ def create_sample_workloads(num_needed_prompt_samples: int) -> int: SystemEvent.objects.create( type=SystemEvent.EventType.LLM_PROMPT_SAMPLING, subtype=SystemEvent.EventSubType.ERROR_UPLOADING_TO_S3, + data={ + "workload_uuid": str(current_workload.workload_uuid), + "upload_url": current_upload_url, + }, long_description=msg, ) @@ -2001,7 +2074,7 @@ def create_sample_workloads(num_needed_prompt_samples: int) -> int: current_prompt_samples = [] current_prompts = [] try: - current_workload, current_upload_url = init_workload(seed) + current_workload, current_upload_url = init_workload(seed, executor_class) except Exception as e: logger.error(f"Failed to create new workload: {e} - aborting prompt sampling") continue diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_llm_tasks.py b/validator/app/src/compute_horde_validator/validator/tests/test_llm_tasks.py index 88d02117e..24cf81dcb 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_llm_tasks.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_llm_tasks.py @@ -1,6 +1,18 @@ +import uuid +from constance import config from unittest.mock import patch +from unittest.mock import Mock +from unittest.mock import AsyncMock import pytest +from compute_horde_core.executor_class import ExecutorClass + +from compute_horde_validator.validator.generation_profile import ( + PromptGenerationProfile, + GenerationProfileSpec, + PROMPT_GENERATION_PROFILES, + EXECUTOR_TO_PROMPT_GENERATION_PROFILE, +) from compute_horde_validator.validator.models import ( Prompt, @@ -9,23 +21,34 @@ SolveWorkload, SystemEvent, ) -from compute_horde_validator.validator.tasks import llm_prompt_generation, llm_prompt_sampling +from compute_horde_validator.validator.tasks import llm_prompt_generation, llm_prompt_sampling, llm_prompt_answering +from compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts import LlmPromptsJobGenerator - -def create_prompt_series(num: int): +def create_prompt_series(num: int, generation_profile=PromptGenerationProfile.default_a6000, s3_url=""): PromptSeries.objects.bulk_create( - [PromptSeries(s3_url="", generator_version=1) for _ in range(num)] + [PromptSeries(s3_url=s3_url, generator_version=1, generation_profile=generation_profile) for _ in range(num)] ) +def default_generation_profiles_configuration_for_a6000_only(): + EXECUTOR_TO_PROMPT_GENERATION_PROFILE.clear() + PROMPT_GENERATION_PROFILES.clear() + + EXECUTOR_TO_PROMPT_GENERATION_PROFILE[ExecutorClass.always_on__llm__a6000] = PromptGenerationProfile.default_a6000 + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a6000] = GenerationProfileSpec( + description="The default generation profile for a6000 cards", + num_prompts=240) + + @pytest.mark.override_config(DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=5) @pytest.mark.django_db(transaction=True) def test_llm_prompt_sampling__will_not_trigger(): + default_generation_profiles_configuration_for_a6000_only() create_prompt_series(5) prompt_series = PromptSeries.objects.create(s3_url="", generator_version=1) for i in range(5): workload = SolveWorkload.objects.create(seed=i, s3_url="s3://test") - PromptSample(series=prompt_series, workload=workload) + # PromptSample(series=prompt_series, workload=workload) with patch("compute_horde_validator.validator.tasks.create_sample_workloads", lambda *_: 0): llm_prompt_sampling() @@ -45,6 +68,7 @@ def test_llm_prompt_sampling__will_not_trigger(): lambda *args: ["test" for _ in range(10)], ) def test_llm_prompt_sampling__fail_upload_to_s3(): + default_generation_profiles_configuration_for_a6000_only() create_prompt_series(4) llm_prompt_sampling() assert SolveWorkload.objects.count() == 0 @@ -56,6 +80,7 @@ def test_llm_prompt_sampling__fail_upload_to_s3(): @pytest.mark.django_db(transaction=True) @patch("compute_horde_validator.validator.tasks.download_prompts_from_s3_url", lambda *args: []) def test_llm_prompt_sampling__fail_download_from_s3(): + default_generation_profiles_configuration_for_a6000_only() create_prompt_series(4) llm_prompt_sampling() assert SolveWorkload.objects.count() == 0 @@ -66,16 +91,17 @@ def test_llm_prompt_sampling__fail_download_from_s3(): @pytest.mark.override_config( DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=5, DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES=10, - DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD=20, ) @pytest.mark.django_db(transaction=True) @patch("compute_horde_validator.validator.tasks.upload_prompts_to_s3_url", lambda *args: True) @patch( "compute_horde_validator.validator.tasks.download_prompts_from_s3_url", - lambda *args: ["test" for _ in range(240)], + lambda *args: ["test" for _ in range(20)], ) def test_llm_prompt_sampling__success(): - create_prompt_series(10) + default_generation_profiles_configuration_for_a6000_only() + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a6000].num_prompts = 20 + create_prompt_series(10, generation_profile=PromptGenerationProfile.default_a6000) llm_prompt_sampling() assert SolveWorkload.objects.count() == 3 assert PromptSample.objects.count() == 6 @@ -85,15 +111,18 @@ def test_llm_prompt_sampling__success(): @pytest.mark.override_config( DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=4, DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES=100, - DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD=80, ) @pytest.mark.django_db(transaction=True) @patch("compute_horde_validator.validator.tasks.upload_prompts_to_s3_url", lambda *args: True) @patch( "compute_horde_validator.validator.tasks.download_prompts_from_s3_url", - lambda *args: ["test" for _ in range(240)], + lambda *args: ["test" for _ in range(101)], ) def test_llm_prompt_sampling__one_sample_per_workload(): + # FIXME: this test doesn't seem to make sense; + # FIXME: it tests series larger than solve workloads, but is this even a valid configuration? + default_generation_profiles_configuration_for_a6000_only() + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a6000].num_prompts = 100 create_prompt_series(8) llm_prompt_sampling() assert SolveWorkload.objects.count() == 4 @@ -104,15 +133,16 @@ def test_llm_prompt_sampling__one_sample_per_workload(): @pytest.mark.override_config( DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=1, DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES=1, - DYNAMIC_NUMBER_OF_PROMPTS_PER_WORKLOAD=5, ) @pytest.mark.django_db(transaction=True) @patch("compute_horde_validator.validator.tasks.upload_prompts_to_s3_url", lambda *args: True) @patch( "compute_horde_validator.validator.tasks.download_prompts_from_s3_url", - lambda *args: ["test" for _ in range(240)], + lambda *args: ["test" for _ in range(5)], ) def test_llm_prompt_sampling__not_enough_for_one_workload(): + default_generation_profiles_configuration_for_a6000_only() + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a6000].num_prompts = 5 create_prompt_series(4) llm_prompt_sampling() assert SolveWorkload.objects.count() == 0 @@ -120,20 +150,260 @@ def test_llm_prompt_sampling__not_enough_for_one_workload(): assert Prompt.objects.count() == 0 -@pytest.mark.override_config(DYNAMIC_MAX_PROMPT_SERIES=5) +@pytest.mark.override_config(DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS=5) @pytest.mark.django_db(transaction=True) def test_llm_prompt_generation__will_trigger(): + default_generation_profiles_configuration_for_a6000_only() create_prompt_series(4) with patch("compute_horde_validator.validator.tasks.generate_prompts") as mock_generate_prompts: llm_prompt_generation() assert mock_generate_prompts.called -@pytest.mark.override_config(DYNAMIC_MAX_PROMPT_SERIES=5) +@pytest.mark.override_config(DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS=5) @pytest.mark.django_db(transaction=True) def test_llm_prompt_generation__will_not_trigger(): + default_generation_profiles_configuration_for_a6000_only() create_prompt_series(10) with patch("compute_horde_validator.validator.tasks.generate_prompts") as mock_generate_prompts: llm_prompt_generation() assert mock_generate_prompts.not_called assert PromptSeries.objects.count() == 10 + + +@pytest.mark.override_config(DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS=5) +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_generation__skips_generation_when_profile_unused(): + default_generation_profiles_configuration_for_a6000_only() + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a100] = GenerationProfileSpec( + description="Unused profile", + num_prompts=777) + + with patch("compute_horde_validator.validator.tasks.generate_prompts") as mock_generate_prompts: + llm_prompt_generation() + mock_generate_prompts.assert_awaited_once() + + +@pytest.mark.override_config(DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS=5) +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_generation__skips_generation_when_workload_exists(): + default_generation_profiles_configuration_for_a6000_only() + + SolveWorkload.objects.create(seed=77, s3_url="s3://test", executor_class=ExecutorClass.always_on__llm__a6000) + with patch("compute_horde_validator.validator.tasks.generate_prompts") as mock_generate_prompts: + llm_prompt_generation() + mock_generate_prompts.assert_not_awaited() + assert PromptSeries.objects.count() == 0 + + +@pytest.mark.override_config(DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS=5, DYNAMIC_PROMPTS_SERIES_IN_A_SINGLE_GENERATION=3) +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_generation__generates_enough_series_to_satisfy_all_executors_sharing_a_generation_profile(): + default_generation_profiles_configuration_for_a6000_only() + # two executor classes use the same generation profile + EXECUTOR_TO_PROMPT_GENERATION_PROFILE[ExecutorClass.always_on__llm__a6000] = PromptGenerationProfile.default_a6000 + EXECUTOR_TO_PROMPT_GENERATION_PROFILE[ExecutorClass.always_on__gpu_24gb] = PromptGenerationProfile.default_a6000 + + # generate more series than needed for 1 executor class but not enough for both executor classes + existing_prompt_series_count = 6 + assert existing_prompt_series_count > config.DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS + assert existing_prompt_series_count < 2 * config.DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS + create_prompt_series(existing_prompt_series_count, generation_profile=PromptGenerationProfile.default_a6000) + + with patch("compute_horde_validator.validator.cross_validation.prompt_generation.run_organic_job") as mock_run_organic_job: + llm_prompt_generation() + assert mock_run_organic_job.await_count == 1 + + assert PromptSeries.objects.count() == existing_prompt_series_count + config.DYNAMIC_PROMPTS_SERIES_IN_A_SINGLE_GENERATION + + +@pytest.mark.override_config(DYNAMIC_MAX_PROMPT_SERIES_PER_EXECUTOR_CLASS=5, DYNAMIC_PROMPTS_SERIES_IN_A_SINGLE_GENERATION=3) +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_generation__generates_prompts_for_multiple_profiles(): + default_generation_profiles_configuration_for_a6000_only() + # add another (fake) profile/executor pair + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a100] = GenerationProfileSpec( + description="Additional testing profile", + num_prompts=234) + EXECUTOR_TO_PROMPT_GENERATION_PROFILE[ExecutorClass.always_on__gpu_24gb] = PromptGenerationProfile.default_a100 + + with patch("compute_horde_validator.validator.cross_validation.prompt_generation.run_organic_job") as mock_run_organic_job: + llm_prompt_generation() + assert mock_run_organic_job.await_count == 2 + + assert PromptSeries.objects.count() == len(PROMPT_GENERATION_PROFILES) * config.DYNAMIC_PROMPTS_SERIES_IN_A_SINGLE_GENERATION + + +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_sampling__skips_sampling_when_unused_prompt_samples_exist(): + default_generation_profiles_configuration_for_a6000_only() + create_prompt_series(5) + prompt_series = PromptSeries.objects.create(s3_url="", generator_version=1) + workload = SolveWorkload.objects.create(seed=17, s3_url="s3://test", executor_class=ExecutorClass.always_on__llm__a6000) + PromptSample.objects.create(series=prompt_series, workload=workload) + + llm_prompt_sampling() + assert ( + SystemEvent.objects.filter( + subtype=SystemEvent.EventSubType.PROMPT_SAMPLING_SKIPPED + ).count() + == 1 + ) + + +@pytest.mark.override_config( + DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=7, + DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES=12, +) +@pytest.mark.django_db(transaction=True) +@patch("compute_horde_validator.validator.tasks.upload_prompts_to_s3_url", lambda *args: True) +@patch( + "compute_horde_validator.validator.tasks.download_prompts_from_s3_url", + lambda s3_url: ["test" for _ in range(s3_url == 's3://a6000' and 24 or 36)] +) +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_sampling__generates_samples_for_multiple_executor_classes(): + default_generation_profiles_configuration_for_a6000_only() + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a6000].num_prompts = 24 + # add another (fake) profile/executor pair + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a100] = GenerationProfileSpec( + description="Additional testing profile", + num_prompts=36) + EXECUTOR_TO_PROMPT_GENERATION_PROFILE[ExecutorClass.always_on__gpu_24gb] = PromptGenerationProfile.default_a100 + + create_prompt_series(55, generation_profile=PromptGenerationProfile.default_a6000, s3_url="s3://a6000") + create_prompt_series(78, generation_profile=PromptGenerationProfile.default_a100, s3_url="s3://a100") + + llm_prompt_sampling() + + # each workload for a6000 fits 24 prompts, so 2 samples, 12 prompts each + # to get 7 samples, we need 4 workloads + assert SolveWorkload.objects.filter(executor_class=ExecutorClass.always_on__llm__a6000).count() == 4 + assert ( + SystemEvent.objects.filter( + subtype=SystemEvent.EventSubType.NEW_WORKLOADS_CREATED, + data__executor_class=ExecutorClass.always_on__llm__a6000, + data__new_workloads_count=4, + ).count() + == 1 + ) + + # each workload for a100 fits 36 prompts, so 3 samples, 12 prompts each + # to get 7 samples, we need 3 workloads + assert SolveWorkload.objects.filter(executor_class=ExecutorClass.always_on__gpu_24gb).count() == 3 + assert ( + SystemEvent.objects.filter( + subtype=SystemEvent.EventSubType.NEW_WORKLOADS_CREATED, + data__executor_class=ExecutorClass.always_on__gpu_24gb, + data__new_workloads_count=3, + ).count() + == 1 + ) + + # 4 workloads for a6000 contain exactly 8 samples + assert PromptSample.objects.filter(workload__executor_class=ExecutorClass.always_on__llm__a6000).count() == 8 + + # 3 workloads for a100 contain 9 samples + assert PromptSample.objects.filter(workload__executor_class=ExecutorClass.always_on__gpu_24gb).count() == 9 + + assert Prompt.objects.count() == 8 * 12 + 9 * 12 + + + +@pytest.mark.override_config( + DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=7, + DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES=12, +) +@pytest.mark.django_db(transaction=True) +@patch( + "compute_horde_validator.validator.tasks.download_prompts_from_s3_url", + lambda s3_url: ["test" for _ in range(24)] +) +def test_llm_prompt_sampling__is_resilient_to_s3_upload_errors_when_creating_workloads(): + default_generation_profiles_configuration_for_a6000_only() + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a6000].num_prompts = 24 + + create_prompt_series(50, generation_profile=PromptGenerationProfile.default_a6000) + + with patch("compute_horde_validator.validator.tasks.upload_prompts_to_s3_url", + Mock(side_effect=[True, Exception("Upload failed"), True, True, True])): + llm_prompt_sampling() + + # there should have been upload error + assert ( + SystemEvent.objects.filter( + subtype=SystemEvent.EventSubType.ERROR_UPLOADING_TO_S3 + ).count() + == 1 + ) + + # but the workloads should have been created nevertheless + # each workload for a6000 fits 24 prompts, so 2 samples, 12 prompts each + # to get 7 samples, we need 4 workloads + assert SolveWorkload.objects.filter(executor_class=ExecutorClass.always_on__llm__a6000).count() == 4 + + # 4 workloads for a6000 contain exactly 8 samples + assert PromptSample.objects.filter(workload__executor_class=ExecutorClass.always_on__llm__a6000).count() == 8 + + assert Prompt.objects.count() == 8 * 12 + + +@pytest.mark.override_config( + DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=7, + DYNAMIC_NUMBER_OF_PROMPTS_TO_SAMPLE_FROM_SERIES=12, +) +@patch("compute_horde_validator.validator.tasks.upload_prompts_to_s3_url", lambda *args: True) +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_sampling__is_resilient_to_s3_download_errors_when_creating_workloads(): + default_generation_profiles_configuration_for_a6000_only() + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a6000].num_prompts = 24 + + create_prompt_series(50, generation_profile=PromptGenerationProfile.default_a6000) + + with patch("compute_horde_validator.validator.tasks.download_prompts_from_s3_url", + Mock(side_effect=[["test" for _ in range(24)], + Exception("Download failed")] + + [["test" for _ in range(24)]] * 50)): + llm_prompt_sampling() + + # there should have been download error + assert ( + SystemEvent.objects.filter( + subtype=SystemEvent.EventSubType.ERROR_DOWNLOADING_FROM_S3 + ).count() + == 1 + ) + + # but the workloads should have been created nevertheless + # each workload for a6000 fits 24 prompts, so 2 samples, 12 prompts each + # to get 7 samples, we need 4 workloads + assert SolveWorkload.objects.filter(executor_class=ExecutorClass.always_on__llm__a6000).count() == 4 + + # 4 workloads for a6000 contain exactly 8 samples + assert PromptSample.objects.filter(workload__executor_class=ExecutorClass.always_on__llm__a6000).count() == 8 + + assert Prompt.objects.count() == 8 * 12 + + + +@pytest.mark.django_db(transaction=True) +def test_llm_prompt_answering__uses_executor_specified_by_workload(): + SolveWorkload.objects.create(seed=1, s3_url="s3://test1", executor_class=ExecutorClass.always_on__llm__a6000) + SolveWorkload.objects.create(seed=2, s3_url="s3://test2", executor_class=ExecutorClass.always_on__llm__a6000) + SolveWorkload.objects.create(seed=3, s3_url="s3://test3", executor_class=ExecutorClass.always_on__gpu_24gb) + SolveWorkload.objects.create(seed=4, s3_url="s3://test4", executor_class=ExecutorClass.always_on__llm__a6000) + + async def set_answers(self, *args, **kwargs): + self.prompt_answers = {f"prompt{i}": f"answer{i}" for i in range(10)} + + with patch("compute_horde_validator.validator.cross_validation.prompt_answering.run_organic_job") as mock_run_organic_job: + with patch.object(LlmPromptsJobGenerator, "download_answers", autospec=True, side_effect=set_answers): + + llm_prompt_answering() + assert mock_run_organic_job.await_count == 4 + assert mock_run_organic_job.await_args_list[0].args[1].executor_class == ExecutorClass.always_on__llm__a6000 + assert mock_run_organic_job.await_args_list[1].args[1].executor_class == ExecutorClass.always_on__llm__a6000 + assert mock_run_organic_job.await_args_list[2].args[1].executor_class == ExecutorClass.always_on__gpu_24gb + assert mock_run_organic_job.await_args_list[3].args[1].executor_class == ExecutorClass.always_on__llm__a6000 + + diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py index b7096f36a..17c02b707 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/mock_generator.py @@ -37,6 +37,7 @@ def verify_correctness(self, msg: V0JobFinishedRequest) -> tuple[bool, str]: def __init__(self, _uuid: uuid.UUID, **kwargs): super().__init__(**kwargs) self._uuid = _uuid + self._init_args = kwargs async def ainit(self, miner_hotkey: str): pass @@ -114,3 +115,4 @@ class TimeTookScoreMockSyntheticJobGeneratorFactory(MockSyntheticJobGeneratorFac async def create(self, executor_class: ExecutorClass, *args) -> BaseSyntheticJobGenerator: _uuid = self._uuids.pop(0) return TimeTookScoreMockSyntheticJobGenerator(_uuid) + diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_download_llm_prompt_answers.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_download_llm_prompt_answers.py index e5c09f2cb..db2e95baf 100644 --- a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_download_llm_prompt_answers.py +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_download_llm_prompt_answers.py @@ -6,12 +6,13 @@ import httpx import pytest from compute_horde.protocol_messages import V0JobFinishedRequest +from compute_horde.executor_class import DEFAULT_LLM_EXECUTOR_CLASS + from pytest_mock import MockerFixture from compute_horde_validator.validator.synthetic_jobs.batch_run import ( _LLM_ANSWERS_DOWNLOAD_MAX_ATTEMPTS, _LLM_ANSWERS_DOWNLOAD_RETRY_MIN_BACKOFF, - LLM_EXECUTOR_CLASS, BatchConfig, BatchContext, LlmAnswerDownloadTask, @@ -26,7 +27,7 @@ def _create_mock_job(name): mock_job = MagicMock() - mock_job.executor_class = LLM_EXECUTOR_CLASS + mock_job.executor_class = DEFAULT_LLM_EXECUTOR_CLASS mock_job.job_generator = AsyncMock(spec=LlmPromptsSyntheticJobGenerator) mock_job.job_response = MagicMock(spec=V0JobFinishedRequest) mock_job.name = name diff --git a/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_llm_synthetic_job_generation.py b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_llm_synthetic_job_generation.py new file mode 100644 index 000000000..34c888a06 --- /dev/null +++ b/validator/app/src/compute_horde_validator/validator/tests/test_synthetic_jobs/test_llm_synthetic_job_generation.py @@ -0,0 +1,160 @@ +import asyncio +import uuid +from constance import config +from django.utils.timezone import now +from unittest.mock import patch +from unittest.mock import Mock +from unittest.mock import AsyncMock + +import pytest +from compute_horde.executor_class import LLM_EXECUTOR_CLASSES +from compute_horde_core.executor_class import ExecutorClass + +from compute_horde_validator.validator.generation_profile import ( + PromptGenerationProfile, + GenerationProfileSpec, + PROMPT_GENERATION_PROFILES, + EXECUTOR_TO_PROMPT_GENERATION_PROFILE, +) + +from compute_horde_validator.validator.models import ( + Prompt, + PromptSample, + PromptSeries, + SolveWorkload, + SystemEvent, +) +from compute_horde_validator.validator.tasks import llm_prompt_generation, llm_prompt_sampling, llm_prompt_answering +from compute_horde_validator.validator.synthetic_jobs.generator.llm_prompts import LlmPromptsJobGenerator +from compute_horde_validator.validator.synthetic_jobs.batch_run import _generate_jobs, BatchContext + +from .mock_generator import MockSyntheticJobGeneratorFactory + +@pytest.fixture +def job_generator_factory(): + job_uuids = [uuid.uuid4() for _ in range(13)] + return MockSyntheticJobGeneratorFactory(uuids=job_uuids) + +@pytest.mark.override_config(DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=5) +@pytest.mark.django_db(transaction=True) +def test_jobs_are_generated_for_executor_classes_which_have_samples(): + setup_two_profiles_and_two_llm_executor_classes() + + executors = { + ExecutorClass.always_on__gpu_24gb: 1, + ExecutorClass.always_on__llm__a6000: 2, + } + + # generate PromptSample for always_on__gpu_24gb class + prompt_sample = create_prompt_sample(ExecutorClass.always_on__gpu_24gb) + + # Create a BatchContext mock + hotkey = 'hotkey' + mock_ctx = create_batch_context(executors, hotkey) + + # generate jobs + asyncio.run(_generate_jobs(mock_ctx)) + + # verify only jobs for the executor class that has PromptSamples are generated + assert len(mock_ctx.jobs) == 1 + for job in mock_ctx.jobs.values(): + assert job.executor_class == ExecutorClass.always_on__gpu_24gb + + assert len(mock_ctx.job_generators[hotkey][ExecutorClass.always_on__llm__a6000]) == 0 + assert len(mock_ctx.job_generators[hotkey][ExecutorClass.always_on__gpu_24gb]) == 1 + assert mock_ctx.job_generators[hotkey][ExecutorClass.always_on__gpu_24gb][0]._init_args['prompt_sample'] == prompt_sample + assert mock_ctx.job_generators[hotkey][ExecutorClass.always_on__gpu_24gb][0]._init_args['expected_prompts'] == list(prompt_sample.prompts.all()) + + +@pytest.mark.override_config(DYNAMIC_TARGET_NUMBER_OF_PROMPT_SAMPLES_READY=5) +@pytest.mark.django_db(transaction=True) +def test_jobs_are_generated_for_multiple_executor_classes(): + setup_two_profiles_and_two_llm_executor_classes() + + executors = { + ExecutorClass.always_on__gpu_24gb: 1, + ExecutorClass.always_on__llm__a6000: 2, + } + + prompt_sample_gpu_24gb = create_prompt_sample(ExecutorClass.always_on__gpu_24gb) + prompt_sample_a6000_1 = create_prompt_sample(ExecutorClass.always_on__llm__a6000) + prompt_sample_a6000_2 = create_prompt_sample(ExecutorClass.always_on__llm__a6000) + + # Create a BatchContext mock + hotkey = 'hotkey' + mock_ctx = create_batch_context(executors, hotkey) + + # generate jobs + asyncio.run(_generate_jobs(mock_ctx)) + + # verify all the executors have jobs + assert len(mock_ctx.jobs) == 3 # 1 for gpu_24gb + 2 for a6000 + + # both jobs have relevant executor classes + executor_classes_in_jobs = {job.executor_class for job in mock_ctx.jobs.values()} + assert ExecutorClass.always_on__gpu_24gb in executor_classes_in_jobs + assert ExecutorClass.always_on__llm__a6000 in executor_classes_in_jobs + + # there are 1 and 2 job generators, respectively + assert len(mock_ctx.job_generators[hotkey][ExecutorClass.always_on__gpu_24gb]) == 1 + assert len(mock_ctx.job_generators[hotkey][ExecutorClass.always_on__llm__a6000]) == 2 + + gpu_24gb_generators = mock_ctx.job_generators[hotkey][ExecutorClass.always_on__gpu_24gb] + a6000_generators = mock_ctx.job_generators[hotkey][ExecutorClass.always_on__llm__a6000] + + # Collect all prompt samples used by gpu_24gb generators + gpu_24gb_used_samples = {gen._init_args['prompt_sample'] for gen in gpu_24gb_generators} + assert gpu_24gb_used_samples == {prompt_sample_gpu_24gb} + + # Collect all prompt samples used by a6000 generators + a6000_used_samples = {gen._init_args['prompt_sample'] for gen in a6000_generators} + assert a6000_used_samples == {prompt_sample_a6000_1, prompt_sample_a6000_2} + + for gen in gpu_24gb_generators: + assert gen._init_args['expected_prompts'] == list(gen._init_args['prompt_sample'].prompts.all()) + + for gen in a6000_generators: + assert gen._init_args['expected_prompts'] == list(gen._init_args['prompt_sample'].prompts.all()) + + +def setup_two_profiles_and_two_llm_executor_classes(): + LLM_EXECUTOR_CLASSES.clear() + LLM_EXECUTOR_CLASSES.add(ExecutorClass.always_on__llm__a6000) + LLM_EXECUTOR_CLASSES.add(ExecutorClass.always_on__gpu_24gb) + + EXECUTOR_TO_PROMPT_GENERATION_PROFILE.clear() + PROMPT_GENERATION_PROFILES.clear() + + EXECUTOR_TO_PROMPT_GENERATION_PROFILE[ExecutorClass.always_on__llm__a6000] = PromptGenerationProfile.default_a6000 + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a6000] = GenerationProfileSpec( + description="The default generation profile for a6000 cards", + num_prompts=24) + + EXECUTOR_TO_PROMPT_GENERATION_PROFILE[ExecutorClass.always_on__gpu_24gb] = PromptGenerationProfile.default_a100 + PROMPT_GENERATION_PROFILES[PromptGenerationProfile.default_a100] = GenerationProfileSpec( + description="Fake generation profile for a100 cards", + num_prompts=13) + + +def create_prompt_sample(executor_class: ExecutorClass): + prompt_series = PromptSeries.objects.create(s3_url="", generator_version=1, generation_profile=EXECUTOR_TO_PROMPT_GENERATION_PROFILE[executor_class]) + workload = SolveWorkload.objects.create(seed=17, s3_url="s3://test", executor_class=executor_class, finished_at=now()) + prompt_sample = PromptSample.objects.create(series=prompt_series, workload=workload) + for prompt_idx in range(13): + Prompt.objects.create(sample=prompt_sample, content=f"prompt{prompt_idx}", answer=f"answer{prompt_idx}") + + return prompt_sample + + +def create_batch_context(executors: dict[ExecutorClass, int], hotkey): + mock_ctx = Mock(spec=BatchContext) + mock_ctx.get_executor_count_per_class = BatchContext.get_executor_count_per_class.__get__(mock_ctx) + mock_ctx.executors = { + hotkey: executors + } + mock_ctx.names = {hotkey: 'test_miner'} + mock_ctx.jobs = {} + mock_ctx.job_uuids = [] + mock_ctx.job_generators = {hotkey: {}} + + return mock_ctx \ No newline at end of file