Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions contributing/RUNS-AND-JOBS.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ A run can spawn one or multiple jobs, depending on the configuration. A task tha
- STEP 4: Once all jobs are finished, the run becomes `TERMINATED`, `DONE`, or `FAILED` based on `RunTerminationReason`.
- STEP 0: If the run is `PENDING`, `background.tasks.process_runs` will resubmit jobs. The run becomes `SUBMITTED` again.

> Use `switch_run_status()` for all status transitions. Do not set `RunModel.status` directly.

> No one must assign the finished status to the run, except `services.runs.process_terminating_run`. To terminate the run, assign `TERMINATING` status and `RunTerminationReason`.

### Services
Expand Down Expand Up @@ -68,6 +70,8 @@ Services' lifecycle has some modifications:
- Once `remove_at` is in the past, it stops the container via `dstack-shim`, detaches instance volumes, and releases the instance. The job becomes `TERMINATED`, `DONE`, `FAILED`, or `ABORTED` based on `JobTerminationReason`.
- If some volumes fail to detach, it keeps the job `TERMINATING` and checks volumes attachment status.

> Use `switch_job_status()` for all status transitions. Do not set `JobModel.status` directly.

> No one must assign the finished status to the job, except `services.jobs.process_terminating_job`. To terminate the job, assign `TERMINATING` status and `JobTerminationReason`.

### Services' Jobs
Expand Down
20 changes: 16 additions & 4 deletions src/dstack/_internal/server/background/tasks/process_fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
PlacementGroupModel,
RunModel,
)
from dstack._internal.server.services import events
from dstack._internal.server.services.fleets import (
create_fleet_instance_model,
get_fleet_spec,
get_next_instance_num,
is_fleet_empty,
is_fleet_in_use,
switch_fleet_status,
)
from dstack._internal.server.services.instances import format_instance_status_for_event
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.utils import sentry_utils
from dstack._internal.utils.common import get_current_datetime
Expand Down Expand Up @@ -121,7 +124,7 @@ async def _process_fleets(session: AsyncSession, fleet_models: List[FleetModel])
deleted_fleets_ids = []
for fleet_model in fleet_models:
_consolidate_fleet_state_with_spec(session, fleet_model)
deleted = _autodelete_fleet(fleet_model)
deleted = _autodelete_fleet(session, fleet_model)
if deleted:
deleted_fleets_ids.append(fleet_model.id)
fleet_model.last_processed_at = get_current_datetime()
Expand Down Expand Up @@ -228,17 +231,26 @@ def _maintain_fleet_nodes_in_min_max_range(
spec=fleet_spec,
instance_num=get_next_instance_num({i.instance_num for i in active_instances}),
)
events.emit(
session,
(
"Instance created to meet target fleet node count."
f" Status: {format_instance_status_for_event(instance_model)}"
),
actor=events.SystemActor(),
targets=[events.Target.from_model(instance_model)],
)
active_instances.append(instance_model)
fleet_model.instances.append(instance_model)
logger.info("Added %s instances to fleet %s", nodes_missing, fleet_model.name)
return True


def _autodelete_fleet(fleet_model: FleetModel) -> bool:
def _autodelete_fleet(session: AsyncSession, fleet_model: FleetModel) -> bool:
if fleet_model.project.deleted:
# It used to be possible to delete project with active resources:
# https://github.com/dstackai/dstack/issues/3077
fleet_model.status = FleetStatus.TERMINATED
switch_fleet_status(session, fleet_model, FleetStatus.TERMINATED)
fleet_model.deleted = True
logger.info("Fleet %s deleted due to deleted project", fleet_model.name)
return True
Expand All @@ -256,7 +268,7 @@ def _autodelete_fleet(fleet_model: FleetModel) -> bool:
return False

logger.info("Automatic cleanup of an empty fleet %s", fleet_model.name)
fleet_model.status = FleetStatus.TERMINATED
switch_fleet_status(session, fleet_model, FleetStatus.TERMINATED)
fleet_model.deleted = True
logger.info("Fleet %s deleted", fleet_model.name)
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
get_job_runtime_data,
is_master_job,
job_model_to_job_submission,
switch_job_status,
)
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.services.logging import fmt
Expand Down Expand Up @@ -164,8 +165,11 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
job_provisioning_data = job_submission.job_provisioning_data
if job_provisioning_data is None:
logger.error("%s: job_provisioning_data of an active job is None", fmt(job_model))
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
job_model.termination_reason_message = (
"Unexpected server error: job_provisioning_data of an active job is None"
)
switch_job_status(session, job_model, JobStatus.TERMINATING)
job_model.last_processed_at = common_utils.get_current_datetime()
await session.commit()
return
Expand Down Expand Up @@ -216,10 +220,9 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
try:
_interpolate_secrets(secrets, job.job_spec)
except InterpolatorError as e:
logger.info("%s: terminating due to secrets interpolation error", fmt(job_model))
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
job_model.termination_reason_message = e.args[0]
job_model.termination_reason_message = f"Secrets interpolation error: {e.args[0]}"
switch_job_status(session, job_model, JobStatus.TERMINATING)
job_model.last_processed_at = common_utils.get_current_datetime()
await session.commit()
return
Expand All @@ -230,7 +233,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):

if initial_status == JobStatus.PROVISIONING:
if job_provisioning_data.hostname is None:
await _wait_for_instance_provisioning_data(job_model=job_model)
await _wait_for_instance_provisioning_data(session, job_model)
job_model.last_processed_at = common_utils.get_current_datetime()
await session.commit()
return
Expand Down Expand Up @@ -258,6 +261,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
server_ssh_private_keys,
job_provisioning_data,
None,
session,
run,
job_model,
job_provisioning_data,
Expand Down Expand Up @@ -292,6 +296,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
server_ssh_private_keys,
job_provisioning_data,
None,
session,
run,
job_model,
job,
Expand All @@ -305,17 +310,17 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):

if not success:
# check timeout
if job_submission.age > get_provisioning_timeout(
provisioning_timeout = get_provisioning_timeout(
backend_type=job_provisioning_data.get_base_backend(),
instance_type_name=job_provisioning_data.instance_type.name,
):
logger.warning(
"%s: failed because runner has not become available in time, age=%s",
fmt(job_model),
job_submission.age,
)
job_model.status = JobStatus.TERMINATING
)
if job_submission.age > provisioning_timeout:
job_model.termination_reason = JobTerminationReason.WAITING_RUNNER_LIMIT_EXCEEDED
job_model.termination_reason_message = (
f"Runner did not become available within {provisioning_timeout.total_seconds()}s."
f" Job submission age: {job_submission.age.total_seconds()}s)"
)
switch_job_status(session, job_model, JobStatus.TERMINATING)
# instance will be emptied by process_terminating_jobs

else: # fails are not acceptable
Expand All @@ -342,6 +347,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
server_ssh_private_keys,
job_provisioning_data,
None,
session,
run,
job_model,
job,
Expand All @@ -360,6 +366,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
server_ssh_private_keys,
job_provisioning_data,
job_submission.job_runtime_data,
session,
run_model,
job_model,
)
Expand All @@ -374,21 +381,17 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
job_model.termination_reason.value,
job_submission.age,
)
job_model.status = JobStatus.TERMINATING
switch_job_status(session, job_model, JobStatus.TERMINATING)
# job will be terminated and instance will be emptied by process_terminating_jobs
else:
# No job_model.termination_reason set means ssh connection failed
if job_model.disconnected_at is None:
job_model.disconnected_at = common_utils.get_current_datetime()
if _should_terminate_job_due_to_disconnect(job_model):
logger.warning(
"%s: failed because instance is unreachable, age=%s",
fmt(job_model),
job_submission.age,
)
# TODO: Replace with JobTerminationReason.INSTANCE_UNREACHABLE for on-demand.
job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY
job_model.status = JobStatus.TERMINATING
job_model.termination_reason_message = "Instance is unreachable"
switch_job_status(session, job_model, JobStatus.TERMINATING)
else:
logger.warning(
"%s: is unreachable, waiting for the instance to become reachable again, age=%s",
Expand Down Expand Up @@ -418,7 +421,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
await session.commit()


async def _wait_for_instance_provisioning_data(job_model: JobModel):
async def _wait_for_instance_provisioning_data(session: AsyncSession, job_model: JobModel):
"""
This function will be called until instance IP address appears
in `job_model.instance.job_provisioning_data` or instance is terminated on timeout.
Expand All @@ -437,8 +440,9 @@ async def _wait_for_instance_provisioning_data(job_model: JobModel):
return

if job_model.instance.status == InstanceStatus.TERMINATED:
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED
job_model.termination_reason_message = "Instance is terminated"
switch_job_status(session, job_model, JobStatus.TERMINATING)
return

job_model.job_provisioning_data = job_model.instance.job_provisioning_data
Expand Down Expand Up @@ -489,6 +493,7 @@ def _should_wait_for_other_nodes(run: Run, job: Job, job_model: JobModel) -> boo
@runner_ssh_tunnel(ports=[DSTACK_SHIM_HTTP_PORT], retries=1)
def _process_provisioning_with_shim(
ports: Dict[int, int],
session: AsyncSession,
run: Run,
job_model: JobModel,
job_provisioning_data: JobProvisioningData,
Expand Down Expand Up @@ -615,14 +620,14 @@ def _process_provisioning_with_shim(
shim_client.stop(force=True)
return False

job_model.status = JobStatus.PULLING
logger.info("%s: now is %s", fmt(job_model), job_model.status.name)
switch_job_status(session, job_model, JobStatus.PULLING)
return True


@runner_ssh_tunnel(ports=[DSTACK_SHIM_HTTP_PORT])
def _process_pulling_with_shim(
ports: Dict[int, int],
session: AsyncSession,
run: Run,
job_model: JobModel,
job: Job,
Expand Down Expand Up @@ -700,6 +705,7 @@ def _process_pulling_with_shim(
server_ssh_private_keys,
job_provisioning_data,
job_runtime_data,
session=session,
run=run,
job_model=job_model,
job=job,
Expand All @@ -715,6 +721,7 @@ def _process_pulling_with_shim(
@runner_ssh_tunnel(ports=[DSTACK_RUNNER_HTTP_PORT])
def _process_running(
ports: Dict[int, int],
session: AsyncSession,
run_model: RunModel,
job_model: JobModel,
) -> bool:
Expand All @@ -740,35 +747,37 @@ def _process_running(
runner_logs=resp.runner_logs,
job_logs=resp.job_logs,
)
previous_status = job_model.status
if len(resp.job_states) > 0:
latest_state_event = resp.job_states[-1]
latest_status = latest_state_event.state
if latest_status == JobStatus.DONE:
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = JobTerminationReason.DONE_BY_RUNNER
switch_job_status(session, job_model, JobStatus.TERMINATING)
elif latest_status in {JobStatus.FAILED, JobStatus.TERMINATED}:
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = JobTerminationReason.CONTAINER_EXITED_WITH_ERROR
if latest_state_event.termination_reason:
job_model.termination_reason = JobTerminationReason(
latest_state_event.termination_reason.lower()
)
if latest_state_event.termination_message:
job_model.termination_reason_message = latest_state_event.termination_message
switch_job_status(session, job_model, JobStatus.TERMINATING)
if (exit_status := latest_state_event.exit_status) is not None:
job_model.exit_status = exit_status
if exit_status != 0:
logger.info("%s: non-zero exit status %s", fmt(job_model), exit_status)
else:
_terminate_if_inactivity_duration_exceeded(run_model, job_model, resp.no_connections_secs)
if job_model.status != previous_status:
logger.info("%s: now is %s", fmt(job_model), job_model.status.name)
_terminate_if_inactivity_duration_exceeded(
session, run_model, job_model, resp.no_connections_secs
)
return True


def _terminate_if_inactivity_duration_exceeded(
run_model: RunModel, job_model: JobModel, no_connections_secs: Optional[int]
session: AsyncSession,
run_model: RunModel,
job_model: JobModel,
no_connections_secs: Optional[int],
) -> None:
conf = RunSpec.__response__.parse_raw(run_model.run_spec).configuration
if not isinstance(conf, DevEnvironmentConfiguration) or not isinstance(
Expand All @@ -781,20 +790,20 @@ def _terminate_if_inactivity_duration_exceeded(
job_model.inactivity_secs = no_connections_secs
if no_connections_secs is None:
# TODO(0.19 or earlier): make no_connections_secs required
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY
job_model.termination_reason_message = (
"The selected instance was created before dstack 0.18.41"
" and does not support inactivity_duration"
)
switch_job_status(session, job_model, JobStatus.TERMINATING)
elif no_connections_secs >= conf.inactivity_duration:
job_model.status = JobStatus.TERMINATING
# TODO(0.19 or earlier): set JobTerminationReason.INACTIVITY_DURATION_EXCEEDED
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
job_model.termination_reason_message = (
f"The job was inactive for {no_connections_secs} seconds,"
f" exceeding the inactivity_duration of {conf.inactivity_duration} seconds"
)
switch_job_status(session, job_model, JobStatus.TERMINATING)


def _should_terminate_job_due_to_disconnect(job_model: JobModel) -> bool:
Expand Down Expand Up @@ -851,8 +860,10 @@ async def _maybe_register_replica(
fmt(job_model),
e,
)
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = JobTerminationReason.GATEWAY_ERROR
# Not including e.args[0] in the message to avoid exposing internal details
job_model.termination_reason_message = "Failed to register service replica"
switch_job_status(session, job_model, JobStatus.TERMINATING)


async def _check_gpu_utilization(session: AsyncSession, job_model: JobModel, job: Job) -> None:
Expand All @@ -873,14 +884,14 @@ async def _check_gpu_utilization(session: AsyncSession, job_model: JobModel, job
if _should_terminate_due_to_low_gpu_util(
policy.min_gpu_utilization, [m.values for m in gpus_util_metrics]
):
logger.info("%s: GPU utilization check: terminating", fmt(job_model))
job_model.status = JobStatus.TERMINATING
logger.debug("%s: GPU utilization check: terminating", fmt(job_model))
# TODO(0.19 or earlier): set JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
job_model.termination_reason_message = (
f"The job GPU utilization below {policy.min_gpu_utilization}%"
f" for {policy.time_window} seconds"
)
switch_job_status(session, job_model, JobStatus.TERMINATING)
else:
logger.debug("%s: GPU utilization check: OK", fmt(job_model))

Expand Down Expand Up @@ -998,6 +1009,7 @@ async def _get_job_file_archive(
@runner_ssh_tunnel(ports=[DSTACK_RUNNER_HTTP_PORT], retries=1)
def _submit_job_to_runner(
ports: Dict[int, int],
session: AsyncSession,
run: Run,
job_model: JobModel,
job: Job,
Expand Down Expand Up @@ -1053,7 +1065,7 @@ def _submit_job_to_runner(
logger.debug("%s: starting job", fmt(job_model))
runner_client.run_job()

job_model.status = JobStatus.RUNNING
switch_job_status(session, job_model, JobStatus.RUNNING)
# do not log here, because the runner will send a new status

return True
Expand Down
Loading