diff --git a/src/dstack/_internal/cli/services/configurators/run.py b/src/dstack/_internal/cli/services/configurators/run.py index ae15343fc6..6006e041de 100644 --- a/src/dstack/_internal/cli/services/configurators/run.py +++ b/src/dstack/_internal/cli/services/configurators/run.py @@ -64,7 +64,6 @@ from dstack._internal.utils.nested_list import NestedList, NestedListItem from dstack._internal.utils.path import is_absolute_posix_path from dstack.api._public.runs import Run -from dstack.api.server import APIClient from dstack.api.utils import load_profile _KNOWN_AMD_GPUS = {gpu.name.lower() for gpu in gpuhunt.KNOWN_AMD_GPUS} @@ -232,8 +231,6 @@ def apply_configuration( ) ) - _warn_fleet_autocreated(self.api.client, run) - console.print( f"\n[code]{run.name}[/] provisioning completed [secondary]({run.status.value})[/]" ) @@ -939,16 +936,3 @@ def render_run_spec_diff(old_spec: RunSpec, new_spec: RunSpec) -> Optional[str]: item = NestedListItem(spec_field.replace("_", " ").capitalize()) nested_list.children.append(item) return nested_list.render() - - -def _warn_fleet_autocreated(api: APIClient, run: Run): - if run._run.fleet is None: - return - fleet = api.fleets.get(project_name=run._project, name=run._run.fleet.name) - if not fleet.spec.autocreated: - return - warn( - f"\nThe run is using automatically created fleet [code]{fleet.name}[/code].\n" - "Future dstack versions won't create fleets automatically.\n" - "Create a fleet explicitly: https://dstack.ai/docs/concepts/fleets/" - ) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 60e72a1afe..3e08fe9238 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -341,7 +341,7 @@ async def _process_submitted_job( job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return - if FeatureFlags.AUTOCREATED_FLEETS_DISABLED: + if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED: logger.debug("%s: no fleet found", fmt(job_model)) job_model.status = JobStatus.TERMINATING job_model.termination_reason = ( diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 58c3529d59..a5b20b15b9 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -106,13 +106,10 @@ async def get_job_plans( exclude_not_available=False, ) if _should_force_non_fleet_offers(run_spec) or ( - not FeatureFlags.AUTOCREATED_FLEETS_DISABLED - and profile.fleets is None - and fleet_model is None + FeatureFlags.AUTOCREATED_FLEETS_ENABLED and profile.fleets is None and fleet_model is None ): # Keep the old behavior returning all offers irrespective of fleets. # Needed for supporting offers with autocreated fleets flow (and for `dstack offer`). - # TODO: Consider dropping when autocreated fleets are dropped. instance_offers, backend_offers = await _get_non_fleet_offers( session=session, project=project, @@ -248,7 +245,6 @@ async def find_optimal_fleet_with_offers( # the run without additional provisioning and choose the one with the cheapest pool offer. # Then choose a fleet with the cheapest pool offer among all fleets with pool offers. # If there are no fleets with pool offers, choose a fleet with a cheapest backend offer. - # Fallback to autocreated fleet if fleets have no pool or backend offers. # TODO: Consider trying all backend offers and then choosing a fleet. candidate_fleets_with_offers: list[ tuple[ @@ -325,7 +321,7 @@ async def find_optimal_fleet_with_offers( return None, [], [] if ( - not FeatureFlags.AUTOCREATED_FLEETS_DISABLED + FeatureFlags.AUTOCREATED_FLEETS_ENABLED and run_spec.merged_profile.fleets is None and all(t[3] == 0 and t[4] == 0 for t in candidate_fleets_with_offers) ): diff --git a/src/dstack/_internal/settings.py b/src/dstack/_internal/settings.py index 0462ddcdfb..84efb9254d 100644 --- a/src/dstack/_internal/settings.py +++ b/src/dstack/_internal/settings.py @@ -35,7 +35,9 @@ class FeatureFlags: development. Feature flags are environment variables of the form DSTACK_FF_* """ - AUTOCREATED_FLEETS_DISABLED = os.getenv("DSTACK_FF_AUTOCREATED_FLEETS_DISABLED") is not None + # DSTACK_FF_AUTOCREATED_FLEETS_ENABLED enables legacy autocreated fleets: + # If there are no fleet suitable for the run, a new fleet is created automatically instead of an error. + AUTOCREATED_FLEETS_ENABLED = os.getenv("DSTACK_FF_AUTOCREATED_FLEETS_ENABLED") is not None # Enabling LEGACY_REPO_DIR_DISABLED does the following: # - Changes `working_dir` default value from `/workflow` to the image's working dir, unless diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index c22b45f0a5..8a3a4b1d57 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -631,9 +631,8 @@ async def test_cannot_assign_multi_node_job_to_partially_busy_shared_instance( await session.refresh(instance) res = await session.execute(select(JobModel).options(joinedload(JobModel.instance))) job = res.unique().scalar_one() - assert job.status == JobStatus.SUBMITTED - assert job.instance_assigned - assert job.instance is None + assert job.status == JobStatus.TERMINATING + assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY assert instance.total_blocks == 4 assert instance.busy_blocks == 1 @@ -724,41 +723,7 @@ async def test_creates_new_instance_in_existing_non_empty_fleet( @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session: AsyncSession): - project = await create_project(session) - user = await create_user(session) - repo = await create_repo(session=session, project_id=project.id) - fleet = await create_fleet(session=session, project=project) - instance = await create_instance( - session=session, - project=project, - fleet=fleet, - instance_num=0, - status=InstanceStatus.BUSY, - ) - fleet.instances.append(instance) - run = await create_run( - session=session, - project=project, - repo=repo, - user=user, - ) - job = await create_job( - session=session, - run=run, - instance_assigned=False, - ) - await session.commit() - await process_submitted_jobs() - await session.refresh(job) - assert job.status == JobStatus.SUBMITTED - assert job.instance_assigned - assert job.instance_id is None - assert job.fleet_id is None - - @pytest.mark.asyncio - @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_assigns_no_fleet_if_run_cannot_fit(self, test_db, session: AsyncSession): + async def test_fails_if_run_cannot_fit_into_fleet(self, test_db, session: AsyncSession): project = await create_project(session) user = await create_user(session) repo = await create_repo(session=session, project_id=project.id) @@ -800,37 +765,8 @@ async def test_assigns_no_fleet_if_run_cannot_fit(self, test_db, session: AsyncS await session.commit() await process_submitted_jobs() await session.refresh(job) - assert job.status == JobStatus.SUBMITTED - assert job.instance_assigned - assert job.instance_id is None - assert job.fleet_id is None - - @pytest.mark.asyncio - @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_does_not_assign_job_to_elastic_empty_fleet_without_backend_offers_if_fleets_unspecified( - self, test_db, session: AsyncSession - ): - project = await create_project(session) - user = await create_user(session) - repo = await create_repo(session=session, project_id=project.id) - fleet_spec = get_fleet_spec() - fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) - await create_fleet(session=session, project=project, spec=fleet_spec, name="fleet") - run = await create_run( - session=session, - project=project, - repo=repo, - user=user, - ) - job = await create_job( - session=session, - run=run, - instance_assigned=False, - ) - await process_submitted_jobs() - await session.refresh(job) - assert job.status == JobStatus.SUBMITTED - assert job.instance_assigned + assert job.status == JobStatus.TERMINATING + assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY assert job.instance_id is None assert job.fleet_id is None diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index dc27ef107c..602d2a9016 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -21,6 +21,7 @@ ServiceConfiguration, TaskConfiguration, ) +from dstack._internal.core.models.fleets import FleetNodesSpec from dstack._internal.core.models.gateways import GatewayStatus from dstack._internal.core.models.instances import ( InstanceAvailability, @@ -59,6 +60,7 @@ create_run, create_user, get_auth_headers, + get_fleet_spec, get_job_provisioning_data, get_run_spec, ) @@ -173,7 +175,7 @@ def get_dev_env_run_plan_dict( "stop_duration": None, "max_price": None, "retry": None, - "spot_policy": "spot", + "spot_policy": "auto", "idle_duration": None, "utilization_policy": None, "startup_order": None, @@ -198,7 +200,7 @@ def get_dev_env_run_plan_dict( "max_price": None, "name": "string", "retry": None, - "spot_policy": "spot", + "spot_policy": "auto", "idle_duration": None, "utilization_policy": None, "startup_order": None, @@ -249,7 +251,7 @@ def get_dev_env_run_plan_dict( "shm_size": None, }, "max_price": None, - "spot": True, + "spot": None, "reservation": None, "multinode": False, }, @@ -387,7 +389,7 @@ def get_dev_env_run_dict( "stop_duration": None, "max_price": None, "retry": None, - "spot_policy": "spot", + "spot_policy": "auto", "idle_duration": None, "utilization_policy": None, "startup_order": None, @@ -412,7 +414,7 @@ def get_dev_env_run_dict( "max_price": None, "name": "string", "retry": None, - "spot_policy": "spot", + "spot_policy": "auto", "idle_duration": None, "utilization_policy": None, "startup_order": None, @@ -458,7 +460,7 @@ def get_dev_env_run_dict( "shm_size": None, }, "max_price": None, - "spot": True, + "spot": None, "reservation": None, "multinode": False, }, @@ -967,12 +969,15 @@ async def test_returns_run_plan_privileged_false( await add_project_member( session=session, project=project, user=user, project_role=ProjectRole.USER ) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + await create_fleet(session=session, project=project, spec=fleet_spec) repo = await create_repo(session=session, project_id=project.id) offer_aws = InstanceOfferWithAvailability( backend=BackendType.AWS, instance=InstanceType( name="instance", - resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]), + resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]), ), region="us", price=1.0, @@ -982,7 +987,7 @@ async def test_returns_run_plan_privileged_false( backend=BackendType.RUNPOD, instance=InstanceType( name="instance", - resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]), + resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]), ), region="us", price=2.0, @@ -1030,12 +1035,15 @@ async def test_returns_run_plan_privileged_true( await add_project_member( session=session, project=project, user=user, project_role=ProjectRole.USER ) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + await create_fleet(session=session, project=project, spec=fleet_spec) repo = await create_repo(session=session, project_id=project.id) offer_aws = InstanceOfferWithAvailability( backend=BackendType.AWS, instance=InstanceType( name="instance", - resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]), + resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]), ), region="us", price=1.0, @@ -1045,7 +1053,7 @@ async def test_returns_run_plan_privileged_true( backend=BackendType.RUNPOD, instance=InstanceType( name="instance", - resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]), + resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]), ), region="us", price=2.0, @@ -1090,12 +1098,15 @@ async def test_returns_run_plan_docker_true( await add_project_member( session=session, project=project, user=user, project_role=ProjectRole.USER ) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + await create_fleet(session=session, project=project, spec=fleet_spec) repo = await create_repo(session=session, project_id=project.id) offer_aws = InstanceOfferWithAvailability( backend=BackendType.AWS, instance=InstanceType( name="instance", - resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]), + resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]), ), region="us", price=1.0, @@ -1105,7 +1116,7 @@ async def test_returns_run_plan_docker_true( backend=BackendType.RUNPOD, instance=InstanceType( name="instance", - resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]), + resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]), ), region="us", price=2.0, @@ -1150,6 +1161,9 @@ async def test_returns_run_plan_instance_volumes( await add_project_member( session=session, project=project, user=user, project_role=ProjectRole.USER ) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + await create_fleet(session=session, project=project, spec=fleet_spec) repo = await create_repo(session=session, project_id=project.id) offer_aws = InstanceOfferWithAvailability( backend=BackendType.AWS,