From 6fe89ac0b2c8b42c50658f810b4d6d3fd0dc1fc6 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Thu, 11 Dec 2025 01:07:24 +0100 Subject: [PATCH] Add more events - All Run/Job/Fleet status transitions - All Job/Instance creation cases - Run deletion --- contributing/RUNS-AND-JOBS.md | 4 + .../server/background/tasks/process_fleets.py | 20 ++++- .../background/tasks/process_running_jobs.py | 86 +++++++++++-------- .../server/background/tasks/process_runs.py | 21 ++--- .../tasks/process_submitted_jobs.py | 71 +++++++++------ src/dstack/_internal/server/models.py | 3 + src/dstack/_internal/server/routers/runs.py | 7 +- .../_internal/server/services/fleets.py | 71 ++++++++++++--- .../_internal/server/services/instances.py | 7 ++ .../server/services/jobs/__init__.py | 75 +++++++++++----- .../server/services/runs/__init__.py | 73 +++++++++++----- .../server/services/runs/replicas.py | 17 +++- src/dstack/_internal/utils/interpolator.py | 7 ++ 13 files changed, 324 insertions(+), 138 deletions(-) diff --git a/contributing/RUNS-AND-JOBS.md b/contributing/RUNS-AND-JOBS.md index b2c0430af4..32725b7353 100644 --- a/contributing/RUNS-AND-JOBS.md +++ b/contributing/RUNS-AND-JOBS.md @@ -30,6 +30,8 @@ A run can spawn one or multiple jobs, depending on the configuration. A task tha - STEP 4: Once all jobs are finished, the run becomes `TERMINATED`, `DONE`, or `FAILED` based on `RunTerminationReason`. - STEP 0: If the run is `PENDING`, `background.tasks.process_runs` will resubmit jobs. The run becomes `SUBMITTED` again. +> Use `switch_run_status()` for all status transitions. Do not set `RunModel.status` directly. + > No one must assign the finished status to the run, except `services.runs.process_terminating_run`. To terminate the run, assign `TERMINATING` status and `RunTerminationReason`. ### Services @@ -68,6 +70,8 @@ Services' lifecycle has some modifications: - Once `remove_at` is in the past, it stops the container via `dstack-shim`, detaches instance volumes, and releases the instance. The job becomes `TERMINATED`, `DONE`, `FAILED`, or `ABORTED` based on `JobTerminationReason`. - If some volumes fail to detach, it keeps the job `TERMINATING` and checks volumes attachment status. +> Use `switch_job_status()` for all status transitions. Do not set `JobModel.status` directly. + > No one must assign the finished status to the job, except `services.jobs.process_terminating_job`. To terminate the job, assign `TERMINATING` status and `JobTerminationReason`. ### Services' Jobs diff --git a/src/dstack/_internal/server/background/tasks/process_fleets.py b/src/dstack/_internal/server/background/tasks/process_fleets.py index 7b6cdc5de3..ffa83e10d7 100644 --- a/src/dstack/_internal/server/background/tasks/process_fleets.py +++ b/src/dstack/_internal/server/background/tasks/process_fleets.py @@ -17,13 +17,16 @@ PlacementGroupModel, RunModel, ) +from dstack._internal.server.services import events from dstack._internal.server.services.fleets import ( create_fleet_instance_model, get_fleet_spec, get_next_instance_num, is_fleet_empty, is_fleet_in_use, + switch_fleet_status, ) +from dstack._internal.server.services.instances import format_instance_status_for_event from dstack._internal.server.services.locking import get_locker from dstack._internal.server.utils import sentry_utils from dstack._internal.utils.common import get_current_datetime @@ -121,7 +124,7 @@ async def _process_fleets(session: AsyncSession, fleet_models: List[FleetModel]) deleted_fleets_ids = [] for fleet_model in fleet_models: _consolidate_fleet_state_with_spec(session, fleet_model) - deleted = _autodelete_fleet(fleet_model) + deleted = _autodelete_fleet(session, fleet_model) if deleted: deleted_fleets_ids.append(fleet_model.id) fleet_model.last_processed_at = get_current_datetime() @@ -228,17 +231,26 @@ def _maintain_fleet_nodes_in_min_max_range( spec=fleet_spec, instance_num=get_next_instance_num({i.instance_num for i in active_instances}), ) + events.emit( + session, + ( + "Instance created to meet target fleet node count." + f" Status: {format_instance_status_for_event(instance_model)}" + ), + actor=events.SystemActor(), + targets=[events.Target.from_model(instance_model)], + ) active_instances.append(instance_model) fleet_model.instances.append(instance_model) logger.info("Added %s instances to fleet %s", nodes_missing, fleet_model.name) return True -def _autodelete_fleet(fleet_model: FleetModel) -> bool: +def _autodelete_fleet(session: AsyncSession, fleet_model: FleetModel) -> bool: if fleet_model.project.deleted: # It used to be possible to delete project with active resources: # https://github.com/dstackai/dstack/issues/3077 - fleet_model.status = FleetStatus.TERMINATED + switch_fleet_status(session, fleet_model, FleetStatus.TERMINATED) fleet_model.deleted = True logger.info("Fleet %s deleted due to deleted project", fleet_model.name) return True @@ -256,7 +268,7 @@ def _autodelete_fleet(fleet_model: FleetModel) -> bool: return False logger.info("Automatic cleanup of an empty fleet %s", fleet_model.name) - fleet_model.status = FleetStatus.TERMINATED + switch_fleet_status(session, fleet_model, FleetStatus.TERMINATED) fleet_model.deleted = True logger.info("Fleet %s deleted", fleet_model.name) return True diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index ba2d96a135..341b47a38b 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -61,6 +61,7 @@ get_job_runtime_data, is_master_job, job_model_to_job_submission, + switch_job_status, ) from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.logging import fmt @@ -164,8 +165,11 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): job_provisioning_data = job_submission.job_provisioning_data if job_provisioning_data is None: logger.error("%s: job_provisioning_data of an active job is None", fmt(job_model)) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER + job_model.termination_reason_message = ( + "Unexpected server error: job_provisioning_data of an active job is None" + ) + switch_job_status(session, job_model, JobStatus.TERMINATING) job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return @@ -216,10 +220,9 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): try: _interpolate_secrets(secrets, job.job_spec) except InterpolatorError as e: - logger.info("%s: terminating due to secrets interpolation error", fmt(job_model)) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER - job_model.termination_reason_message = e.args[0] + job_model.termination_reason_message = f"Secrets interpolation error: {e.args[0]}" + switch_job_status(session, job_model, JobStatus.TERMINATING) job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return @@ -230,7 +233,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): if initial_status == JobStatus.PROVISIONING: if job_provisioning_data.hostname is None: - await _wait_for_instance_provisioning_data(job_model=job_model) + await _wait_for_instance_provisioning_data(session, job_model) job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return @@ -258,6 +261,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): server_ssh_private_keys, job_provisioning_data, None, + session, run, job_model, job_provisioning_data, @@ -292,6 +296,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): server_ssh_private_keys, job_provisioning_data, None, + session, run, job_model, job, @@ -305,17 +310,17 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): if not success: # check timeout - if job_submission.age > get_provisioning_timeout( + provisioning_timeout = get_provisioning_timeout( backend_type=job_provisioning_data.get_base_backend(), instance_type_name=job_provisioning_data.instance_type.name, - ): - logger.warning( - "%s: failed because runner has not become available in time, age=%s", - fmt(job_model), - job_submission.age, - ) - job_model.status = JobStatus.TERMINATING + ) + if job_submission.age > provisioning_timeout: job_model.termination_reason = JobTerminationReason.WAITING_RUNNER_LIMIT_EXCEEDED + job_model.termination_reason_message = ( + f"Runner did not become available within {provisioning_timeout.total_seconds()}s." + f" Job submission age: {job_submission.age.total_seconds()}s)" + ) + switch_job_status(session, job_model, JobStatus.TERMINATING) # instance will be emptied by process_terminating_jobs else: # fails are not acceptable @@ -342,6 +347,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): server_ssh_private_keys, job_provisioning_data, None, + session, run, job_model, job, @@ -360,6 +366,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): server_ssh_private_keys, job_provisioning_data, job_submission.job_runtime_data, + session, run_model, job_model, ) @@ -374,21 +381,17 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): job_model.termination_reason.value, job_submission.age, ) - job_model.status = JobStatus.TERMINATING + switch_job_status(session, job_model, JobStatus.TERMINATING) # job will be terminated and instance will be emptied by process_terminating_jobs else: # No job_model.termination_reason set means ssh connection failed if job_model.disconnected_at is None: job_model.disconnected_at = common_utils.get_current_datetime() if _should_terminate_job_due_to_disconnect(job_model): - logger.warning( - "%s: failed because instance is unreachable, age=%s", - fmt(job_model), - job_submission.age, - ) # TODO: Replace with JobTerminationReason.INSTANCE_UNREACHABLE for on-demand. job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY - job_model.status = JobStatus.TERMINATING + job_model.termination_reason_message = "Instance is unreachable" + switch_job_status(session, job_model, JobStatus.TERMINATING) else: logger.warning( "%s: is unreachable, waiting for the instance to become reachable again, age=%s", @@ -418,7 +421,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): await session.commit() -async def _wait_for_instance_provisioning_data(job_model: JobModel): +async def _wait_for_instance_provisioning_data(session: AsyncSession, job_model: JobModel): """ This function will be called until instance IP address appears in `job_model.instance.job_provisioning_data` or instance is terminated on timeout. @@ -437,8 +440,9 @@ async def _wait_for_instance_provisioning_data(job_model: JobModel): return if job_model.instance.status == InstanceStatus.TERMINATED: - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED + job_model.termination_reason_message = "Instance is terminated" + switch_job_status(session, job_model, JobStatus.TERMINATING) return job_model.job_provisioning_data = job_model.instance.job_provisioning_data @@ -489,6 +493,7 @@ def _should_wait_for_other_nodes(run: Run, job: Job, job_model: JobModel) -> boo @runner_ssh_tunnel(ports=[DSTACK_SHIM_HTTP_PORT], retries=1) def _process_provisioning_with_shim( ports: Dict[int, int], + session: AsyncSession, run: Run, job_model: JobModel, job_provisioning_data: JobProvisioningData, @@ -615,14 +620,14 @@ def _process_provisioning_with_shim( shim_client.stop(force=True) return False - job_model.status = JobStatus.PULLING - logger.info("%s: now is %s", fmt(job_model), job_model.status.name) + switch_job_status(session, job_model, JobStatus.PULLING) return True @runner_ssh_tunnel(ports=[DSTACK_SHIM_HTTP_PORT]) def _process_pulling_with_shim( ports: Dict[int, int], + session: AsyncSession, run: Run, job_model: JobModel, job: Job, @@ -700,6 +705,7 @@ def _process_pulling_with_shim( server_ssh_private_keys, job_provisioning_data, job_runtime_data, + session=session, run=run, job_model=job_model, job=job, @@ -715,6 +721,7 @@ def _process_pulling_with_shim( @runner_ssh_tunnel(ports=[DSTACK_RUNNER_HTTP_PORT]) def _process_running( ports: Dict[int, int], + session: AsyncSession, run_model: RunModel, job_model: JobModel, ) -> bool: @@ -740,15 +747,13 @@ def _process_running( runner_logs=resp.runner_logs, job_logs=resp.job_logs, ) - previous_status = job_model.status if len(resp.job_states) > 0: latest_state_event = resp.job_states[-1] latest_status = latest_state_event.state if latest_status == JobStatus.DONE: - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.DONE_BY_RUNNER + switch_job_status(session, job_model, JobStatus.TERMINATING) elif latest_status in {JobStatus.FAILED, JobStatus.TERMINATED}: - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.CONTAINER_EXITED_WITH_ERROR if latest_state_event.termination_reason: job_model.termination_reason = JobTerminationReason( @@ -756,19 +761,23 @@ def _process_running( ) if latest_state_event.termination_message: job_model.termination_reason_message = latest_state_event.termination_message + switch_job_status(session, job_model, JobStatus.TERMINATING) if (exit_status := latest_state_event.exit_status) is not None: job_model.exit_status = exit_status if exit_status != 0: logger.info("%s: non-zero exit status %s", fmt(job_model), exit_status) else: - _terminate_if_inactivity_duration_exceeded(run_model, job_model, resp.no_connections_secs) - if job_model.status != previous_status: - logger.info("%s: now is %s", fmt(job_model), job_model.status.name) + _terminate_if_inactivity_duration_exceeded( + session, run_model, job_model, resp.no_connections_secs + ) return True def _terminate_if_inactivity_duration_exceeded( - run_model: RunModel, job_model: JobModel, no_connections_secs: Optional[int] + session: AsyncSession, + run_model: RunModel, + job_model: JobModel, + no_connections_secs: Optional[int], ) -> None: conf = RunSpec.__response__.parse_raw(run_model.run_spec).configuration if not isinstance(conf, DevEnvironmentConfiguration) or not isinstance( @@ -781,20 +790,20 @@ def _terminate_if_inactivity_duration_exceeded( job_model.inactivity_secs = no_connections_secs if no_connections_secs is None: # TODO(0.19 or earlier): make no_connections_secs required - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY job_model.termination_reason_message = ( "The selected instance was created before dstack 0.18.41" " and does not support inactivity_duration" ) + switch_job_status(session, job_model, JobStatus.TERMINATING) elif no_connections_secs >= conf.inactivity_duration: - job_model.status = JobStatus.TERMINATING # TODO(0.19 or earlier): set JobTerminationReason.INACTIVITY_DURATION_EXCEEDED job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER job_model.termination_reason_message = ( f"The job was inactive for {no_connections_secs} seconds," f" exceeding the inactivity_duration of {conf.inactivity_duration} seconds" ) + switch_job_status(session, job_model, JobStatus.TERMINATING) def _should_terminate_job_due_to_disconnect(job_model: JobModel) -> bool: @@ -851,8 +860,10 @@ async def _maybe_register_replica( fmt(job_model), e, ) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.GATEWAY_ERROR + # Not including e.args[0] in the message to avoid exposing internal details + job_model.termination_reason_message = "Failed to register service replica" + switch_job_status(session, job_model, JobStatus.TERMINATING) async def _check_gpu_utilization(session: AsyncSession, job_model: JobModel, job: Job) -> None: @@ -873,14 +884,14 @@ async def _check_gpu_utilization(session: AsyncSession, job_model: JobModel, job if _should_terminate_due_to_low_gpu_util( policy.min_gpu_utilization, [m.values for m in gpus_util_metrics] ): - logger.info("%s: GPU utilization check: terminating", fmt(job_model)) - job_model.status = JobStatus.TERMINATING + logger.debug("%s: GPU utilization check: terminating", fmt(job_model)) # TODO(0.19 or earlier): set JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER job_model.termination_reason_message = ( f"The job GPU utilization below {policy.min_gpu_utilization}%" f" for {policy.time_window} seconds" ) + switch_job_status(session, job_model, JobStatus.TERMINATING) else: logger.debug("%s: GPU utilization check: OK", fmt(job_model)) @@ -998,6 +1009,7 @@ async def _get_job_file_archive( @runner_ssh_tunnel(ports=[DSTACK_RUNNER_HTTP_PORT], retries=1) def _submit_job_to_runner( ports: Dict[int, int], + session: AsyncSession, run: Run, job_model: JobModel, job: Job, @@ -1053,7 +1065,7 @@ def _submit_job_to_runner( logger.debug("%s: starting job", fmt(job_model)) runner_client.run_job() - job_model.status = JobStatus.RUNNING + switch_job_status(session, job_model, JobStatus.RUNNING) # do not log here, because the runner will send a new status return True diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 4ab2633d91..af2dcee8d8 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -33,6 +33,7 @@ get_job_specs_from_run_spec, group_jobs_by_replica_latest, is_master_job, + switch_job_status, ) from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.prometheus.client_metrics import run_metrics @@ -40,6 +41,7 @@ fmt, process_terminating_run, run_model_to_run, + switch_run_status, ) from dstack._internal.server.services.runs.replicas import ( is_replica_registered, @@ -168,12 +170,12 @@ async def _process_run(session: AsyncSession, run_model: RunModel): await process_terminating_run(session, run_model) else: logger.error("%s: unexpected status %s", fmt(run_model), run_model.status.name) - run_model.status = RunStatus.TERMINATING run_model.termination_reason = RunTerminationReason.SERVER_ERROR + switch_run_status(session, run_model, RunStatus.TERMINATING) except ServerError as e: logger.error("%s: run processing error: %s", fmt(run_model), e) - run_model.status = RunStatus.TERMINATING run_model.termination_reason = RunTerminationReason.SERVER_ERROR + switch_run_status(session, run_model, RunStatus.TERMINATING) run_model.last_processed_at = common.get_current_datetime() await session.commit() @@ -206,9 +208,7 @@ async def _process_pending_run(session: AsyncSession, run_model: RunModel): return await scale_run_replicas(session, run_model, replicas_diff=run_model.desired_replica_count) - - run_model.status = RunStatus.SUBMITTED - logger.info("%s: run status has changed PENDING -> SUBMITTED", fmt(run_model)) + switch_run_status(session, run_model, RunStatus.SUBMITTED) def _retrying_run_ready_for_resubmission(run_model: RunModel, run: Run) -> bool: @@ -356,8 +356,9 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): if not ( job_model.status.is_finished() or job_model.status == JobStatus.TERMINATING ): - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER + job_model.termination_reason_message = "Run is to be resubmitted" + switch_job_status(session, job_model, JobStatus.TERMINATING) if new_status not in {RunStatus.TERMINATING, RunStatus.PENDING}: # No need to retry, scale, or redeploy replicas if the run is terminating, @@ -367,12 +368,6 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): ) if run_model.status != new_status: - logger.info( - "%s: run status has changed %s -> %s", - fmt(run_model), - run_model.status.name, - new_status.name, - ) if run_model.status == RunStatus.SUBMITTED and new_status == RunStatus.PROVISIONING: current_time = common.get_current_datetime() submit_to_provision_duration = (current_time - run_model.submitted_at).total_seconds() @@ -391,8 +386,8 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): # Unassign run from fleet so that the new fleet can be chosen when retrying run_model.fleet = None - run_model.status = new_status run_model.termination_reason = termination_reason + switch_run_status(session, run_model, new_status) # While a run goes to pending without provisioning, resubmission_attempt increases. if new_status == RunStatus.PROVISIONING: run_model.resubmission_attempt = 0 diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 3e08fe9238..defa75e8b5 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -69,6 +69,7 @@ VolumeAttachmentModel, VolumeModel, ) +from dstack._internal.server.services import events from dstack._internal.server.services.backends import get_project_backend_by_type_or_error from dstack._internal.server.services.fleets import ( check_can_create_new_cloud_instance_in_fleet, @@ -79,6 +80,7 @@ is_cloud_cluster, ) from dstack._internal.server.services.instances import ( + format_instance_status_for_event, get_instance_provisioning_data, ) from dstack._internal.server.services.jobs import ( @@ -90,6 +92,7 @@ get_job_runtime_data, is_master_job, is_multinode_job, + switch_job_status, ) from dstack._internal.server.services.locking import get_locker, string_to_lock_id from dstack._internal.server.services.logging import fmt @@ -273,9 +276,9 @@ async def _process_submitted_job( check_can_attach_job_volumes(volumes) except ServerClientError as e: logger.warning("%s: failed to prepare run volumes: %s", fmt(job_model), repr(e)) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.VOLUME_ERROR job_model.termination_reason_message = e.msg + switch_job_status(session, job_model, JobStatus.TERMINATING) job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return @@ -333,21 +336,21 @@ async def _process_submitted_job( if run_spec.merged_profile.fleets is not None: # Run cannot create new fleets when fleets are specified logger.debug("%s: failed to use specified fleets", fmt(job_model)) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = ( JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY ) job_model.termination_reason_message = "Failed to use specified fleets" + switch_job_status(session, job_model, JobStatus.TERMINATING) job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return if not FeatureFlags.AUTOCREATED_FLEETS_ENABLED: logger.debug("%s: no fleet found", fmt(job_model)) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = ( JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY ) job_model.termination_reason_message = "Failed to find fleet" + switch_job_status(session, job_model, JobStatus.TERMINATING) job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return @@ -375,12 +378,13 @@ async def _process_submitted_job( .execution_options(populate_existing=True) ) instance = res.unique().scalar_one() - job_model.status = JobStatus.PROVISIONING + switch_job_status(session, job_model, JobStatus.PROVISIONING) else: if run_profile.creation_policy == CreationPolicy.REUSE: logger.debug("%s: reuse instance failed", fmt(job_model)) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + job_model.termination_reason_message = "Could not reuse any instances for this job" + switch_job_status(session, job_model, JobStatus.TERMINATING) job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return @@ -410,8 +414,8 @@ async def _process_submitted_job( ) if run_job_result is None: logger.debug("%s: provisioning failed", fmt(job_model)) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY + switch_job_status(session, job_model, JobStatus.TERMINATING) job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return @@ -424,6 +428,15 @@ async def _process_submitted_job( run=run, ) session.add(fleet_model) + events.emit( + session, + f"Fleet created for job. Fleet status: {fleet_model.status.upper()}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(fleet_model), + events.Target.from_model(job_model), + ], + ) provisioning_data, offer, effective_profile, _ = run_job_result compute_group_model = None @@ -448,7 +461,7 @@ async def _process_submitted_job( instance = None # Instance for attaching volumes in case of single job provisioned for provisioned_job_model, jpd in zip(provisioned_job_models, jpds): provisioned_job_model.job_provisioning_data = jpd.json() - provisioned_job_model.status = JobStatus.PROVISIONING + switch_job_status(session, provisioned_job_model, JobStatus.PROVISIONING) # FIXME: Fleet is not locked which may lead to duplicate instance_num. # This is currently hard to fix without locking the fleet for entire provisioning duration. # Processing should be done in multiple steps so that @@ -470,16 +483,16 @@ async def _process_submitted_job( provisioned_job_model.job_runtime_data = _prepare_job_runtime_data( offer, multinode ).json() - logger.info( - "Created a new instance %s for job %s", - instance.name, - provisioned_job_model.job_name, - extra={ - "instance_name": instance.name, - "instance_status": InstanceStatus.PROVISIONING.value, - }, - ) session.add(instance) + events.emit( + session, + f"Instance created for job. Instance status: {format_instance_status_for_event(instance)}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(instance), + events.Target.from_model(provisioned_job_model), + ], + ) provisioned_job_model.used_instance_id = instance.id provisioned_job_model.last_processed_at = common_utils.get_current_datetime() @@ -615,20 +628,22 @@ async def _assign_job_to_fleet_instance( instance.status = InstanceStatus.BUSY instance.busy_blocks += offer.blocks - logger.info( - "The job %s switched instance %s status to BUSY", - job_model.job_name, - instance.name, - extra={ - "instance_name": instance.name, - "instance_status": InstanceStatus.BUSY.value, - }, - ) - logger.info("%s: now is provisioning on '%s'", fmt(job_model), instance.name) job_model.instance = instance job_model.used_instance_id = instance.id job_model.job_provisioning_data = instance.job_provisioning_data job_model.job_runtime_data = _prepare_job_runtime_data(offer, multinode).json() + events.emit( + session, + ( + "Job assigned to instance." + f" Instance status: {format_instance_status_for_event(instance)}" + ), + actor=events.SystemActor(), + targets=[ + events.Target.from_model(job_model), + events.Target.from_model(instance), + ], + ) return instance @@ -1014,17 +1029,17 @@ async def _attach_volumes( break # attach next mount point except (ServerClientError, BackendError) as e: logger.warning("%s: failed to attached volume: %s", fmt(job_model), repr(e)) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.VOLUME_ERROR job_model.termination_reason_message = "Failed to attach volume" + switch_job_status(session, job_model, JobStatus.TERMINATING) except Exception: logger.exception( "%s: got exception when attaching volume", fmt(job_model), ) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.VOLUME_ERROR job_model.termination_reason_message = "Failed to attach volume" + switch_job_status(session, job_model, JobStatus.TERMINATING) finally: job_model.job_runtime_data = job_runtime_data.json() diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index b64d58b9e5..22a70eceb3 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -373,6 +373,7 @@ class RunModel(BaseModel): submitted_at: Mapped[datetime] = mapped_column(NaiveDateTime) last_processed_at: Mapped[datetime] = mapped_column(NaiveDateTime) next_triggered_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) + # NOTE: `status` must be changed only via `switch_run_status()` status: Mapped[RunStatus] = mapped_column(EnumAsString(RunStatus, 100), index=True) termination_reason: Mapped[Optional[RunTerminationReason]] = mapped_column( EnumAsString(RunTerminationReason, 100) @@ -424,6 +425,7 @@ class JobModel(BaseModel): submission_num: Mapped[int] = mapped_column(Integer) submitted_at: Mapped[datetime] = mapped_column(NaiveDateTime) last_processed_at: Mapped[datetime] = mapped_column(NaiveDateTime) + # NOTE: `status` must be changed only via `switch_job_status()` status: Mapped[JobStatus] = mapped_column(EnumAsString(JobStatus, 100), index=True) termination_reason: Mapped[Optional[JobTerminationReason]] = mapped_column( EnumAsString(JobTerminationReason, 100) @@ -564,6 +566,7 @@ class FleetModel(BaseModel): deleted: Mapped[bool] = mapped_column(Boolean, default=False) deleted_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) + # NOTE: `status` must be changed only via `switch_fleet_status()` status: Mapped[FleetStatus] = mapped_column(EnumAsString(FleetStatus, 100), index=True) status_message: Mapped[Optional[str]] = mapped_column(Text) diff --git a/src/dstack/_internal/server/routers/runs.py b/src/dstack/_internal/server/routers/runs.py index 30c8dbbd44..24baee9179 100644 --- a/src/dstack/_internal/server/routers/runs.py +++ b/src/dstack/_internal/server/routers/runs.py @@ -170,9 +170,10 @@ async def stop_runs( """ Stop one or more runs. """ - _, project = user_project + user, project = user_project await runs.stop_runs( session=session, + user=user, project=project, runs_names=body.runs_names, abort=body.abort, @@ -188,8 +189,8 @@ async def delete_runs( """ Delete one or more runs. The runs must be stopped before they can be deleted. """ - _, project = user_project - await runs.delete_runs(session=session, project=project, runs_names=body.runs_names) + user, project = user_project + await runs.delete_runs(session=session, user=user, project=project, runs_names=body.runs_names) # apply_plan replaces submit_run since it can create new runs. diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index b1285134ec..16901bdf1a 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -57,6 +57,7 @@ from dstack._internal.server.services import instances as instances_services from dstack._internal.server.services import offers as offers_services from dstack._internal.server.services.instances import ( + format_instance_status_for_event, get_instance_remote_connection_info, list_active_remote_instances, ) @@ -78,6 +79,25 @@ logger = get_logger(__name__) +def switch_fleet_status( + session: AsyncSession, + fleet_model: FleetModel, + new_status: FleetStatus, + actor: events.AnyActor = events.SystemActor(), +): + """ + Switch fleet status. + """ + old_status = fleet_model.status + if old_status == new_status: + return + + fleet_model.status = new_status + + msg = f"Fleet status changed {old_status.upper()} -> {new_status.upper()}" + events.emit(session, msg, actor=actor, targets=[events.Target.from_model(fleet_model)]) + + async def list_fleets( session: AsyncSession, user: UserModel, @@ -414,6 +434,7 @@ async def apply_plan( if fleet_model is not None: return await _update_fleet( session=session, + user=user, project=project, spec=spec, current_resource=plan.current_resource, @@ -588,7 +609,12 @@ async def delete_fleets( _terminate_fleet_instances(fleet_model=fleet_model, instance_nums=instance_nums) # TERMINATING fleets are deleted by process_fleets after instances are terminated if instance_nums is None: - fleet_model.status = FleetStatus.TERMINATING + switch_fleet_status( + session, + fleet_model, + FleetStatus.TERMINATING, + actor=events.UserActor.from_user(user), + ) await session.commit() @@ -761,7 +787,7 @@ async def _create_fleet( ) if spec.configuration.ssh_config is not None: for i, host in enumerate(spec.configuration.ssh_config.hosts): - instances_model = await create_fleet_ssh_instance_model( + instance_model = await create_fleet_ssh_instance_model( project=project, spec=spec, ssh_params=spec.configuration.ssh_config, @@ -769,7 +795,16 @@ async def _create_fleet( instance_num=i, host=host, ) - fleet_model.instances.append(instances_model) + events.emit( + session, + ( + "Instance created on fleet submission." + f" Status: {format_instance_status_for_event(instance_model)}" + ), + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_model)], + ) + fleet_model.instances.append(instance_model) else: for i in range(_get_fleet_nodes_to_provision(spec)): instance_model = create_fleet_instance_model( @@ -779,20 +814,27 @@ async def _create_fleet( spec=spec, instance_num=i, ) + events.emit( + session, + ( + "Instance created on fleet submission." + f" Status: {format_instance_status_for_event(instance_model)}" + ), + # Set `SystemActor` for consistency with other places where cloud instances can be + # created (fleet spec consolidation, job provisioning, etc). Think of the fleet as being + # created by the user, while the cloud instance is created by the system to satisfy the + # fleet spec. + actor=events.SystemActor(), + targets=[events.Target.from_model(instance_model)], + ) fleet_model.instances.append(instance_model) - for instance_model in fleet_model.instances: - events.emit( - session, - f"Instance created on fleet submission. Status: {instance_model.status.upper()}", - actor=events.SystemActor(), - targets=[events.Target.from_model(instance_model)], - ) await session.commit() return fleet_model_to_fleet(fleet_model) async def _update_fleet( session: AsyncSession, + user: UserModel, project: ProjectModel, spec: FleetSpec, current_resource: Optional[Fleet], @@ -860,6 +902,15 @@ async def _update_fleet( instance_num=instance_num, host=host, ) + events.emit( + session, + ( + "Instance created on fleet update." + f" Status: {format_instance_status_for_event(instance_model)}" + ), + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_model)], + ) fleet_model.instances.append(instance_model) active_instance_nums.add(instance_num) if removed_instance_nums: diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index 7c679b0cc2..56459efd78 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -59,6 +59,13 @@ logger = get_logger(__name__) +def format_instance_status_for_event(instance_model: InstanceModel) -> str: + msg = instance_model.status.upper() + if instance_model.total_blocks is not None: + msg += f" ({instance_model.busy_blocks}/{instance_model.total_blocks} blocks busy)" + return msg + + async def get_instance_health_checks( session: AsyncSession, project: ProjectModel, diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 314bacb422..1ed3c5f99e 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -41,9 +41,12 @@ RunModel, VolumeModel, ) -from dstack._internal.server.services import services +from dstack._internal.server.services import events, services from dstack._internal.server.services import volumes as volumes_services -from dstack._internal.server.services.instances import get_instance_ssh_private_keys +from dstack._internal.server.services.instances import ( + format_instance_status_for_event, + get_instance_ssh_private_keys, +) from dstack._internal.server.services.jobs.configurators.base import ( JobConfigurator, interpolate_job_volumes, @@ -66,6 +69,34 @@ logger = get_logger(__name__) +def switch_job_status( + session: AsyncSession, + job_model: JobModel, + new_status: JobStatus, + actor: events.AnyActor = events.SystemActor(), +): + """ + Switch job status. + + **NOTE**: When switching to `TERMINATING`, set `termination_reason` and preferably + `termination_reason_message` before calling this function. + """ + old_status = job_model.status + if old_status == new_status: + return + + job_model.status = new_status + + msg = f"Job status changed {old_status.upper()} -> {new_status.upper()}" + if new_status == JobStatus.TERMINATING: + if job_model.termination_reason is None: + raise ValueError("termination_reason must be set when switching to TERMINATING status") + msg += f". Termination reason: {job_model.termination_reason.upper()}" + if job_model.termination_reason_message: + msg += f" ({job_model.termination_reason_message})" + events.emit(session, msg, actor=actor, targets=[events.Target.from_model(job_model)]) + + async def get_jobs_from_run_spec( run_spec: RunSpec, secrets: Dict[str, str], replica_num: int ) -> List[Job]: @@ -277,7 +308,7 @@ async def process_terminating_job( if instance_model is None: # Possible if the job hasn't been assigned an instance yet await services.unregister_replica(session, job_model) - _set_job_termination_status(job_model) + _set_job_termination_status(session, job_model) return all_volumes_detached: bool = True @@ -339,6 +370,19 @@ async def process_terminating_job( job_model.instance_id = None instance_model.last_job_processed_at = common.get_current_datetime() + events.emit( + session, + ( + "Job unassigned from instance." + f" Instance status: {format_instance_status_for_event(instance_model)}" + ), + actor=events.SystemActor(), + targets=[ + events.Target.from_model(job_model), + events.Target.from_model(instance_model), + ], + ) + volume_names = ( jrd.volume_names if jrd and jrd.volume_names @@ -351,16 +395,10 @@ async def process_terminating_job( for volume in volumes: volume.last_job_processed_at = common.get_current_datetime() - logger.info( - "%s: instance '%s' has been released, new status is %s", - fmt(job_model), - instance_model.name, - instance_model.status.name, - ) await services.unregister_replica(session, job_model) if all_volumes_detached: # Do not terminate while some volumes are not detached. - _set_job_termination_status(job_model) + _set_job_termination_status(session, job_model) async def process_volumes_detaching( @@ -395,22 +433,15 @@ async def process_volumes_detaching( # Do not terminate the job while some volumes are not detached. # If force detach never succeeds, the job will be stuck terminating. # The job releases the instance when soft detaching, so the instance won't be stuck. - _set_job_termination_status(job_model) + _set_job_termination_status(session, job_model) -def _set_job_termination_status(job_model: JobModel): +def _set_job_termination_status(session: AsyncSession, job_model: JobModel): if job_model.termination_reason is not None: - job_model.status = job_model.termination_reason.to_status() - termination_reason_name = job_model.termination_reason.name + status = job_model.termination_reason.to_status() else: - job_model.status = JobStatus.FAILED - termination_reason_name = None - logger.info( - "%s: job status is %s, reason: %s", - fmt(job_model), - job_model.status.name, - termination_reason_name, - ) + status = JobStatus.FAILED + switch_job_status(session, job_model, status) async def stop_container( diff --git a/src/dstack/_internal/server/services/runs/__init__.py b/src/dstack/_internal/server/services/runs/__init__.py index 18c0847b41..5773403cff 100644 --- a/src/dstack/_internal/server/services/runs/__init__.py +++ b/src/dstack/_internal/server/services/runs/__init__.py @@ -57,6 +57,7 @@ job_model_to_job_submission, remove_job_spec_sensitive_info, stop_runner, + switch_job_status, ) from dstack._internal.server.services.locking import get_locker, string_to_lock_id from dstack._internal.server.services.logging import fmt @@ -84,6 +85,29 @@ } +def switch_run_status( + session: AsyncSession, + run_model: RunModel, + new_status: RunStatus, + actor: events.AnyActor = events.SystemActor(), +): + """ + Switch run status. + """ + old_status = run_model.status + if old_status == new_status: + return + + run_model.status = new_status + + msg = f"Run status changed {old_status.upper()} -> {new_status.upper()}" + if new_status == RunStatus.TERMINATING: + if run_model.termination_reason is None: + raise ValueError("termination_reason must be set when switching to TERMINATING status") + msg += f". Termination reason: {run_model.termination_reason.upper()}" + events.emit(session, msg, actor=actor, targets=[events.Target.from_model(run_model)]) + + async def list_user_runs( session: AsyncSession, user: UserModel, @@ -449,7 +473,9 @@ async def submit_run( project=project, ) else: - await delete_runs(session=session, project=project, runs_names=[run_spec.run_name]) + await delete_runs( + session=session, user=user, project=project, runs_names=[run_spec.run_name] + ) await _validate_run( session=session, @@ -510,6 +536,10 @@ async def submit_run( events.emit( session, f"Job created on run submission. Status: {job_model.status.upper()}", + # Set `SystemActor` for consistency with all other places where jobs can be + # created (retry, scaling, rolling deployments, etc). Think of the run as being + # created by the user, while the job is created by the system to satisfy the + # run spec. actor=events.SystemActor(), targets=[ events.Target.from_model(job_model), @@ -527,6 +557,11 @@ def create_job_model_for_new_submission( job: Job, status: JobStatus, ) -> JobModel: + """ + Create a new job. + + **NOTE**: don't forget to emit an event when writing the new job to the database. + """ now = common_utils.get_current_datetime() return JobModel( id=uuid.uuid4(), @@ -551,6 +586,7 @@ def create_job_model_for_new_submission( async def stop_runs( session: AsyncSession, + user: UserModel, project: ProjectModel, runs_names: List[str], abort: bool, @@ -582,11 +618,13 @@ async def stop_runs( for run_model in run_models: if run_model.status.is_finished(): continue - run_model.status = RunStatus.TERMINATING if abort: run_model.termination_reason = RunTerminationReason.ABORTED_BY_USER else: run_model.termination_reason = RunTerminationReason.STOPPED_BY_USER + switch_run_status( + session, run_model, RunStatus.TERMINATING, actor=events.UserActor.from_user(user) + ) run_model.last_processed_at = now # The run will be terminated by process_runs. # Terminating synchronously is problematic since it may take a long time. @@ -595,6 +633,7 @@ async def stop_runs( async def delete_runs( session: AsyncSession, + user: UserModel, project: ProjectModel, runs_names: List[str], ): @@ -620,14 +659,15 @@ async def delete_runs( raise ServerClientError( msg=f"Cannot delete active runs: {[r.run_name for r in active_runs]}" ) - await session.execute( - update(RunModel) - .where( - RunModel.project_id == project.id, - RunModel.run_name.in_(runs_names), - ) - .values(deleted=True) - ) + for run_model in run_models: + if not run_model.deleted: + run_model.deleted = True + events.emit( + session, + "Run deleted", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(run_model)], + ) await session.commit() @@ -910,8 +950,8 @@ async def process_terminating_run(session: AsyncSession, run_model: RunModel): # Send a signal to stop the job gracefully await stop_runner(session, job_model) delay_job_instance_termination(job_model) - job_model.status = JobStatus.TERMINATING job_model.termination_reason = job_termination_reason + switch_job_status(session, job_model, JobStatus.TERMINATING) job_model.last_processed_at = common_utils.get_current_datetime() if unfinished_jobs_count == 0: @@ -926,18 +966,11 @@ async def process_terminating_run(session: AsyncSession, run_model: RunModel): not in [RunTerminationReason.ABORTED_BY_USER, RunTerminationReason.STOPPED_BY_USER] ): run_model.next_triggered_at = _get_next_triggered_at(run.run_spec) - run_model.status = RunStatus.PENDING + switch_run_status(session, run_model, RunStatus.PENDING) # Unassign run from fleet so that the new fleet can be chosen on the next submission run_model.fleet = None else: - run_model.status = run_model.termination_reason.to_status() - - logger.info( - "%s: run status has changed TERMINATING -> %s, reason: %s", - fmt(run_model), - run_model.status.name, - run_model.termination_reason.name, - ) + switch_run_status(session, run_model, run_model.termination_reason.to_status()) def is_job_ready(probes: Iterable[ProbeModel], probe_specs: Iterable[ProbeSpec]) -> bool: diff --git a/src/dstack/_internal/server/services/runs/replicas.py b/src/dstack/_internal/server/services/runs/replicas.py index b1c33c90c7..43065d96d9 100644 --- a/src/dstack/_internal/server/services/runs/replicas.py +++ b/src/dstack/_internal/server/services/runs/replicas.py @@ -4,9 +4,11 @@ from dstack._internal.core.models.runs import JobStatus, JobTerminationReason, RunSpec from dstack._internal.server.models import JobModel, RunModel +from dstack._internal.server.services import events from dstack._internal.server.services.jobs import ( get_jobs_from_run_spec, group_jobs_by_replica_latest, + switch_job_status, ) from dstack._internal.server.services.logging import fmt from dstack._internal.server.services.runs import create_job_model_for_new_submission, logger @@ -35,8 +37,9 @@ async def retry_run_replica_jobs( # No need to resubmit, skip continue # The job is not finished, but we have to retry all jobs. Terminate it - job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER + job_model.termination_reason_message = "Replica is to be retried" + switch_job_status(session, job_model, JobStatus.TERMINATING) new_job_model = create_job_model_for_new_submission( run_model=run_model, @@ -46,6 +49,12 @@ async def retry_run_replica_jobs( # dirty hack to avoid passing all job submissions new_job_model.submission_num = job_model.submission_num + 1 session.add(new_job_model) + events.emit( + session, + f"Job created when re-running replica. Status: {new_job_model.status.upper()}", + actor=events.SystemActor(), + targets=[events.Target.from_model(new_job_model)], + ) def is_replica_registered(jobs: list[JobModel]) -> bool: @@ -133,3 +142,9 @@ async def scale_run_replicas(session: AsyncSession, run_model: RunModel, replica status=JobStatus.SUBMITTED, ) session.add(job_model) + events.emit( + session, + f"Job created on new replica submission. Status: {job_model.status.upper()}", + actor=events.SystemActor(), + targets=[events.Target.from_model(job_model)], + ) diff --git a/src/dstack/_internal/utils/interpolator.py b/src/dstack/_internal/utils/interpolator.py index 421280cbd9..9d2fd8997a 100644 --- a/src/dstack/_internal/utils/interpolator.py +++ b/src/dstack/_internal/utils/interpolator.py @@ -13,6 +13,13 @@ class Name: class InterpolatorError(ValueError): + """ + Raised when interpolation fails. + + May be shown to the users, should not contain sensitive information, + such as variable values. + """ + pass