Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions src/dstack/_internal/cli/services/configurators/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
from dstack._internal.utils.nested_list import NestedList, NestedListItem
from dstack._internal.utils.path import is_absolute_posix_path
from dstack.api._public.runs import Run
from dstack.api.server import APIClient
from dstack.api.utils import load_profile

_KNOWN_AMD_GPUS = {gpu.name.lower() for gpu in gpuhunt.KNOWN_AMD_GPUS}
Expand Down Expand Up @@ -232,8 +231,6 @@ def apply_configuration(
)
)

_warn_fleet_autocreated(self.api.client, run)

console.print(
f"\n[code]{run.name}[/] provisioning completed [secondary]({run.status.value})[/]"
)
Expand Down Expand Up @@ -939,16 +936,3 @@ def render_run_spec_diff(old_spec: RunSpec, new_spec: RunSpec) -> Optional[str]:
item = NestedListItem(spec_field.replace("_", " ").capitalize())
nested_list.children.append(item)
return nested_list.render()


def _warn_fleet_autocreated(api: APIClient, run: Run):
if run._run.fleet is None:
return
fleet = api.fleets.get(project_name=run._project, name=run._run.fleet.name)
if not fleet.spec.autocreated:
return
warn(
f"\nThe run is using automatically created fleet [code]{fleet.name}[/code].\n"
"Future dstack versions won't create fleets automatically.\n"
"Create a fleet explicitly: https://dstack.ai/docs/concepts/fleets/"
)
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ async def _process_submitted_job(
job_model.last_processed_at = common_utils.get_current_datetime()
await session.commit()
return
if FeatureFlags.AUTOCREATED_FLEETS_DISABLED:
if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED:
logger.debug("%s: no fleet found", fmt(job_model))
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = (
Expand Down
8 changes: 2 additions & 6 deletions src/dstack/_internal/server/services/runs/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,10 @@ async def get_job_plans(
exclude_not_available=False,
)
if _should_force_non_fleet_offers(run_spec) or (
not FeatureFlags.AUTOCREATED_FLEETS_DISABLED
and profile.fleets is None
and fleet_model is None
FeatureFlags.AUTOCREATED_FLEETS_ENABLED and profile.fleets is None and fleet_model is None
):
# Keep the old behavior returning all offers irrespective of fleets.
# Needed for supporting offers with autocreated fleets flow (and for `dstack offer`).
# TODO: Consider dropping when autocreated fleets are dropped.
instance_offers, backend_offers = await _get_non_fleet_offers(
session=session,
project=project,
Expand Down Expand Up @@ -248,7 +245,6 @@ async def find_optimal_fleet_with_offers(
# the run without additional provisioning and choose the one with the cheapest pool offer.
# Then choose a fleet with the cheapest pool offer among all fleets with pool offers.
# If there are no fleets with pool offers, choose a fleet with a cheapest backend offer.
# Fallback to autocreated fleet if fleets have no pool or backend offers.
# TODO: Consider trying all backend offers and then choosing a fleet.
candidate_fleets_with_offers: list[
tuple[
Expand Down Expand Up @@ -325,7 +321,7 @@ async def find_optimal_fleet_with_offers(
return None, [], []

if (
not FeatureFlags.AUTOCREATED_FLEETS_DISABLED
FeatureFlags.AUTOCREATED_FLEETS_ENABLED
and run_spec.merged_profile.fleets is None
and all(t[3] == 0 and t[4] == 0 for t in candidate_fleets_with_offers)
):
Expand Down
4 changes: 3 additions & 1 deletion src/dstack/_internal/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ class FeatureFlags:
development. Feature flags are environment variables of the form DSTACK_FF_*
"""

AUTOCREATED_FLEETS_DISABLED = os.getenv("DSTACK_FF_AUTOCREATED_FLEETS_DISABLED") is not None
# DSTACK_FF_AUTOCREATED_FLEETS_ENABLED enables legacy autocreated fleets:
# If there are no fleet suitable for the run, a new fleet is created automatically instead of an error.
AUTOCREATED_FLEETS_ENABLED = os.getenv("DSTACK_FF_AUTOCREATED_FLEETS_ENABLED") is not None

# Enabling LEGACY_REPO_DIR_DISABLED does the following:
# - Changes `working_dir` default value from `/workflow` to the image's working dir, unless
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,8 @@ async def test_cannot_assign_multi_node_job_to_partially_busy_shared_instance(
await session.refresh(instance)
res = await session.execute(select(JobModel).options(joinedload(JobModel.instance)))
job = res.unique().scalar_one()
assert job.status == JobStatus.SUBMITTED
assert job.instance_assigned
assert job.instance is None
assert job.status == JobStatus.TERMINATING
assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
assert instance.total_blocks == 4
assert instance.busy_blocks == 1

Expand Down Expand Up @@ -724,41 +723,7 @@ async def test_creates_new_instance_in_existing_non_empty_fleet(

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_assigns_no_fleet_when_all_fleets_occupied(self, test_db, session: AsyncSession):
project = await create_project(session)
user = await create_user(session)
repo = await create_repo(session=session, project_id=project.id)
fleet = await create_fleet(session=session, project=project)
instance = await create_instance(
session=session,
project=project,
fleet=fleet,
instance_num=0,
status=InstanceStatus.BUSY,
)
fleet.instances.append(instance)
run = await create_run(
session=session,
project=project,
repo=repo,
user=user,
)
job = await create_job(
session=session,
run=run,
instance_assigned=False,
)
await session.commit()
await process_submitted_jobs()
await session.refresh(job)
assert job.status == JobStatus.SUBMITTED
assert job.instance_assigned
assert job.instance_id is None
assert job.fleet_id is None

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_assigns_no_fleet_if_run_cannot_fit(self, test_db, session: AsyncSession):
async def test_fails_if_run_cannot_fit_into_fleet(self, test_db, session: AsyncSession):
project = await create_project(session)
user = await create_user(session)
repo = await create_repo(session=session, project_id=project.id)
Expand Down Expand Up @@ -800,37 +765,8 @@ async def test_assigns_no_fleet_if_run_cannot_fit(self, test_db, session: AsyncS
await session.commit()
await process_submitted_jobs()
await session.refresh(job)
assert job.status == JobStatus.SUBMITTED
assert job.instance_assigned
assert job.instance_id is None
assert job.fleet_id is None

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_does_not_assign_job_to_elastic_empty_fleet_without_backend_offers_if_fleets_unspecified(
self, test_db, session: AsyncSession
):
project = await create_project(session)
user = await create_user(session)
repo = await create_repo(session=session, project_id=project.id)
fleet_spec = get_fleet_spec()
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1)
await create_fleet(session=session, project=project, spec=fleet_spec, name="fleet")
run = await create_run(
session=session,
project=project,
repo=repo,
user=user,
)
job = await create_job(
session=session,
run=run,
instance_assigned=False,
)
await process_submitted_jobs()
await session.refresh(job)
assert job.status == JobStatus.SUBMITTED
assert job.instance_assigned
assert job.status == JobStatus.TERMINATING
assert job.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
assert job.instance_id is None
assert job.fleet_id is None

Expand Down
38 changes: 26 additions & 12 deletions src/tests/_internal/server/routers/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ServiceConfiguration,
TaskConfiguration,
)
from dstack._internal.core.models.fleets import FleetNodesSpec
from dstack._internal.core.models.gateways import GatewayStatus
from dstack._internal.core.models.instances import (
InstanceAvailability,
Expand Down Expand Up @@ -59,6 +60,7 @@
create_run,
create_user,
get_auth_headers,
get_fleet_spec,
get_job_provisioning_data,
get_run_spec,
)
Expand Down Expand Up @@ -173,7 +175,7 @@ def get_dev_env_run_plan_dict(
"stop_duration": None,
"max_price": None,
"retry": None,
"spot_policy": "spot",
"spot_policy": "auto",
"idle_duration": None,
"utilization_policy": None,
"startup_order": None,
Expand All @@ -198,7 +200,7 @@ def get_dev_env_run_plan_dict(
"max_price": None,
"name": "string",
"retry": None,
"spot_policy": "spot",
"spot_policy": "auto",
"idle_duration": None,
"utilization_policy": None,
"startup_order": None,
Expand Down Expand Up @@ -249,7 +251,7 @@ def get_dev_env_run_plan_dict(
"shm_size": None,
},
"max_price": None,
"spot": True,
"spot": None,
"reservation": None,
"multinode": False,
},
Expand Down Expand Up @@ -387,7 +389,7 @@ def get_dev_env_run_dict(
"stop_duration": None,
"max_price": None,
"retry": None,
"spot_policy": "spot",
"spot_policy": "auto",
"idle_duration": None,
"utilization_policy": None,
"startup_order": None,
Expand All @@ -412,7 +414,7 @@ def get_dev_env_run_dict(
"max_price": None,
"name": "string",
"retry": None,
"spot_policy": "spot",
"spot_policy": "auto",
"idle_duration": None,
"utilization_policy": None,
"startup_order": None,
Expand Down Expand Up @@ -458,7 +460,7 @@ def get_dev_env_run_dict(
"shm_size": None,
},
"max_price": None,
"spot": True,
"spot": None,
"reservation": None,
"multinode": False,
},
Expand Down Expand Up @@ -967,12 +969,15 @@ async def test_returns_run_plan_privileged_false(
await add_project_member(
session=session, project=project, user=user, project_role=ProjectRole.USER
)
fleet_spec = get_fleet_spec()
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
await create_fleet(session=session, project=project, spec=fleet_spec)
repo = await create_repo(session=session, project_id=project.id)
offer_aws = InstanceOfferWithAvailability(
backend=BackendType.AWS,
instance=InstanceType(
name="instance",
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
),
region="us",
price=1.0,
Expand All @@ -982,7 +987,7 @@ async def test_returns_run_plan_privileged_false(
backend=BackendType.RUNPOD,
instance=InstanceType(
name="instance",
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
),
region="us",
price=2.0,
Expand Down Expand Up @@ -1030,12 +1035,15 @@ async def test_returns_run_plan_privileged_true(
await add_project_member(
session=session, project=project, user=user, project_role=ProjectRole.USER
)
fleet_spec = get_fleet_spec()
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
await create_fleet(session=session, project=project, spec=fleet_spec)
repo = await create_repo(session=session, project_id=project.id)
offer_aws = InstanceOfferWithAvailability(
backend=BackendType.AWS,
instance=InstanceType(
name="instance",
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
),
region="us",
price=1.0,
Expand All @@ -1045,7 +1053,7 @@ async def test_returns_run_plan_privileged_true(
backend=BackendType.RUNPOD,
instance=InstanceType(
name="instance",
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
),
region="us",
price=2.0,
Expand Down Expand Up @@ -1090,12 +1098,15 @@ async def test_returns_run_plan_docker_true(
await add_project_member(
session=session, project=project, user=user, project_role=ProjectRole.USER
)
fleet_spec = get_fleet_spec()
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
await create_fleet(session=session, project=project, spec=fleet_spec)
repo = await create_repo(session=session, project_id=project.id)
offer_aws = InstanceOfferWithAvailability(
backend=BackendType.AWS,
instance=InstanceType(
name="instance",
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
),
region="us",
price=1.0,
Expand All @@ -1105,7 +1116,7 @@ async def test_returns_run_plan_docker_true(
backend=BackendType.RUNPOD,
instance=InstanceType(
name="instance",
resources=Resources(cpus=1, memory_mib=512, spot=False, gpus=[]),
resources=Resources(cpus=2, memory_mib=8192, spot=False, gpus=[]),
),
region="us",
price=2.0,
Expand Down Expand Up @@ -1150,6 +1161,9 @@ async def test_returns_run_plan_instance_volumes(
await add_project_member(
session=session, project=project, user=user, project_role=ProjectRole.USER
)
fleet_spec = get_fleet_spec()
fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None)
await create_fleet(session=session, project=project, spec=fleet_spec)
repo = await create_repo(session=session, project_id=project.id)
offer_aws = InstanceOfferWithAvailability(
backend=BackendType.AWS,
Expand Down