Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ repos:
rev: v2.6.2 # Should match .github/workflows/build-artifacts.yml
hooks:
- id: golangci-lint-full
alias: runner-fix
language_version: 1.25.0 # Should match runner/go.mod
entry: bash -c 'cd runner && golangci-lint run --fix'
stages: [manual]
- id: golangci-lint-full
alias: runner-lint
language_version: 1.25.0 # Should match runner/go.mod
entry: bash -c 'cd runner && golangci-lint run'
stages: [manual]
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/reference/dstack.yml/dev-environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ The `dev-environment` configuration type allows running [dev environments](../..
type:
required: true

??? info "`if_exists` action"

If the `path` already exists and is a non-empty directory, by default the run is terminated with an error.
This can be changed with the `if_exists` option:

* `error` – do not try to check out, terminate the run with an error (the default action since `0.20.0`)
* `skip` – do not try to check out, skip the repo (the only action available before `0.20.0`)

Note, if the `path` exists and is _not_ a directory (e.g., a regular file), this is always an error that
cannot be ignored with the `skip` action.

??? info "Short syntax"

The short syntax for repos is a colon-separated string in the form of `local_path_or_url:path`.
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/reference/dstack.yml/service.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,17 @@ The `service` configuration type allows running [services](../../concepts/servic
type:
required: true

??? info "`if_exists` action"

If the `path` already exists and is a non-empty directory, by default the run is terminated with an error.
This can be changed with the `if_exists` option:

* `error` – do not try to check out, terminate the run with an error (the default action since `0.20.0`)
* `skip` – do not try to check out, skip the repo (the only action available before `0.20.0`)

Note, if the `path` exists and is _not_ a directory (e.g., a regular file), this is always an error that
cannot be ignored with the `skip` action.

??? info "Short syntax"

The short syntax for repos is a colon-separated string in the form of `local_path_or_url:path`.
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/reference/dstack.yml/task.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ The `task` configuration type allows running [tasks](../../concepts/tasks.md).
type:
required: true

??? info "`if_exists` action"

If the `path` already exists and is a non-empty directory, by default the run is terminated with an error.
This can be changed with the `if_exists` option:

* `error` – do not try to check out, terminate the run with an error (the default action since `0.20.0`)
* `skip` – do not try to check out, skip the repo (the only action available before `0.20.0`)

Note, if the `path` exists and is _not_ a directory (e.g., a regular file), this is always an error that
cannot be ignored with the `skip` action.

??? info "Short syntax"

The short syntax for repos is a colon-separated string in the form of `local_path_or_url:path`.
Expand Down
45 changes: 29 additions & 16 deletions runner/internal/executor/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/dstackai/dstack/runner/internal/common"
"github.com/dstackai/dstack/runner/internal/log"
"github.com/dstackai/dstack/runner/internal/repo"
"github.com/dstackai/dstack/runner/internal/schemas"
)

// setupRepo must be called from Run
Expand All @@ -36,13 +37,27 @@ func (ex *RunExecutor) setupRepo(ctx context.Context) error {
}
log.Trace(ctx, "Job repo dir", "path", ex.repoDir)

shouldCheckout, err := ex.shouldCheckout(ctx)
repoDirIsEmpty, err := ex.prepareRepoDir(ctx)
if err != nil {
return fmt.Errorf("check if checkout needed: %w", err)
}
if !shouldCheckout {
log.Info(ctx, "skipping repo checkout: repo dir is not empty")
return nil
return fmt.Errorf("prepare repo dir: %w", err)
}
if !repoDirIsEmpty {
var repoExistsAction schemas.RepoExistsAction
if ex.jobSpec.RepoExistsAction != nil {
repoExistsAction = *ex.jobSpec.RepoExistsAction
} else {
log.Debug(ctx, "repo_exists_action is not set, using legacy 'skip' action")
repoExistsAction = schemas.RepoExistsActionSkip
}
switch repoExistsAction {
case schemas.RepoExistsActionError:
return fmt.Errorf("setup repo: repo dir is not empty: %s", ex.repoDir)
case schemas.RepoExistsActionSkip:
log.Info(ctx, "Skipping repo checkout: repo dir is not empty", "path", ex.repoDir)
return nil
default:
return fmt.Errorf("setup repo: unsupported action: %s", repoExistsAction)
}
}
// Move existing repo files from the repo dir and back to be able to git clone.
// Currently, only needed for volumes mounted inside repo with lost+found present.
Expand Down Expand Up @@ -143,11 +158,11 @@ func (ex *RunExecutor) prepareArchive(ctx context.Context) error {
return nil
}

func (ex *RunExecutor) shouldCheckout(ctx context.Context) (bool, error) {
log.Trace(ctx, "checking if repo checkout is needed")
func (ex *RunExecutor) prepareRepoDir(ctx context.Context) (bool, error) {
log.Trace(ctx, "Preparing repo dir")
info, err := os.Stat(ex.repoDir)
if err != nil {
if os.IsNotExist(err) {
if errors.Is(err, os.ErrNotExist) {
if err = common.MkdirAll(ctx, ex.repoDir, ex.jobUid, ex.jobGid); err != nil {
return false, fmt.Errorf("create repo dir: %w", err)
}
Expand All @@ -157,24 +172,22 @@ func (ex *RunExecutor) shouldCheckout(ctx context.Context) (bool, error) {
return false, fmt.Errorf("stat repo dir: %w", err)
}
if !info.IsDir() {
return false, fmt.Errorf("failed to set up repo dir: %s is not a dir", ex.repoDir)
return false, fmt.Errorf("stat repo dir: %s is not a dir", ex.repoDir)
}
entries, err := os.ReadDir(ex.repoDir)
if err != nil {
return false, fmt.Errorf("read repo dir: %w", err)
}
if len(entries) == 0 {
// Repo dir existed but was empty, e.g. a volume without repo
// Repo dir is empty
return true, nil
}
if len(entries) > 1 {
// Repo already checked out, e.g. a volume with repo
return false, nil
}
if entries[0].Name() == "lost+found" {
if len(entries) == 1 && entries[0].Name() == "lost+found" {
// lost+found may be present on a newly created volume
// We (but not Git, see `{move,restore}RepoDir`) consider such a dir "empty"
return true, nil
}
// Repo dir is not empty
return false, nil
}

Expand Down
12 changes: 10 additions & 2 deletions runner/internal/schemas/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ type JobSpec struct {
// `RepoData` is optional for compatibility with jobs submitted before 0.19.17.
// Use `RunExecutor.getRepoData()` to get non-nil `RepoData`.
// TODO: make required when supporting jobs submitted before 0.19.17 is no longer relevant.
RepoData *RepoData `json:"repo_data"`
FileArchives []FileArchive `json:"file_archives"`
RepoData *RepoData `json:"repo_data"`
RepoExistsAction *RepoExistsAction `json:"repo_exists_action"`
FileArchives []FileArchive `json:"file_archives"`
}

type ClusterInfo struct {
Expand Down Expand Up @@ -102,6 +103,13 @@ type RepoData struct {
RepoConfigEmail string `json:"repo_config_email"`
}

type RepoExistsAction string

const (
RepoExistsActionError RepoExistsAction = "error"
RepoExistsActionSkip RepoExistsAction = "skip"
)

type FileArchive struct {
Id string `json:"id"`
Path string `json:"path"`
Expand Down
6 changes: 5 additions & 1 deletion scripts/docs/gen_schema_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import inspect
import logging
import re
from enum import Enum
from fnmatch import fnmatch

import mkdocs_gen_files
Expand Down Expand Up @@ -63,11 +64,14 @@ def generate_schema_reference(
]
)
for name, field in cls.__fields__.items():
default = field.default
if isinstance(default, Enum):
default = default.value
values = dict(
name=name,
description=field.field_info.description,
type=get_type(field.annotation),
default=field.default,
default=default,
required=field.required,
)
# TODO: If the field doesn't have description (e.g. BaseConfiguration.type), we could fallback to docstring
Expand Down
1 change: 0 additions & 1 deletion src/dstack/_internal/core/compatibility/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def get_run_spec_excludes(run_spec: RunSpec) -> IncludeExcludeDictType:
configuration_excludes["schedule"] = True
if profile is not None and profile.schedule is None:
profile_excludes.add("schedule")
configuration_excludes["repos"] = True

if configuration_excludes:
spec_excludes["configuration"] = configuration_excludes
Expand Down
18 changes: 17 additions & 1 deletion src/dstack/_internal/core/models/configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from dstack._internal.core.models.services import AnyModel, OpenAIChatModel
from dstack._internal.core.models.unix import UnixUser
from dstack._internal.core.models.volumes import MountPoint, VolumeConfiguration, parse_mount_point
from dstack._internal.utils.common import has_duplicates
from dstack._internal.utils.common import has_duplicates, list_enum_values_for_annotation
from dstack._internal.utils.json_schema import add_extra_schema_types
from dstack._internal.utils.json_utils import (
pydantic_orjson_dumps_with_indent,
Expand Down Expand Up @@ -95,6 +95,13 @@ def parse(cls, v: str) -> "PortMapping":
return PortMapping(local_port=local_port, container_port=int(container_port))


class RepoExistsAction(str, Enum):
# Don't try to check out, terminate the run with an error (the default action since 0.20.0)
ERROR = "error"
# Don't try to check out, skip the repo (the logic hardcoded in the pre-0.20.0 runner)
SKIP = "skip"


class RepoSpec(CoreModel):
local_path: Annotated[
Optional[str],
Expand Down Expand Up @@ -132,6 +139,15 @@ class RepoSpec(CoreModel):
)
),
] = None
if_exists: Annotated[
RepoExistsAction,
Field(
description=(
"The action to be taken if `path` exists and is not empty."
f" One of: {list_enum_values_for_annotation(RepoExistsAction)}"
),
),
] = RepoExistsAction.ERROR

@classmethod
def parse(cls, v: str) -> Self:
Expand Down
3 changes: 3 additions & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
AnyRunConfiguration,
HTTPHeaderSpec,
HTTPMethod,
RepoExistsAction,
RunConfiguration,
ServiceConfiguration,
)
Expand Down Expand Up @@ -281,6 +282,8 @@ class JobSpec(CoreModel):
repo_code_hash: Optional[str] = None
# `repo_dir` was added in 0.19.27. Default value is set for backward compatibility
repo_dir: str = LEGACY_REPO_DIR
# None for jobs without repo and any jobs submitted by pre-0.20.0 clients
repo_exists_action: Optional[RepoExistsAction] = None
file_archives: list[FileArchiveMapping] = []
# None for non-services and pre-0.19.19 services. See `get_service_port`
service_port: Optional[int] = None
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/server/schemas/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class SubmitBody(CoreModel):
"working_dir",
"repo_dir",
"repo_data",
"repo_exists_action",
"file_archives",
}
),
Expand Down
13 changes: 13 additions & 0 deletions src/dstack/_internal/server/services/jobs/configurators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
PortMapping,
ProbeConfig,
PythonVersion,
RepoExistsAction,
RunConfigurationType,
ServiceConfiguration,
)
Expand Down Expand Up @@ -169,6 +170,7 @@ async def _get_job_spec(
repo_data=self.run_spec.repo_data,
repo_code_hash=self.run_spec.repo_code_hash,
repo_dir=self._repo_dir(),
repo_exists_action=self._repo_exists_action(),
file_archives=self.run_spec.file_archives,
service_port=self._service_port(),
probes=self._probes(),
Expand Down Expand Up @@ -320,6 +322,17 @@ def _repo_dir(self) -> str:
return LEGACY_REPO_DIR
return repo_dir

def _repo_exists_action(self) -> Optional[RepoExistsAction]:
if not (repos := self.run_spec.configuration.repos):
# One of:
# - The configuration without repo submitted by any client.
# - The configuration _with_ repo submitted by pre-0.20.0 client (the `repos` option
# is always excluded by pre-0.20.0 clients for compatibility with pre-0.20.0 servers)
# In either case, we return None, and runner falls back to "skip" action if needed
# (the second case, the only action hardcoded in pre-0.20.0 runners)
return None
return repos[0].if_exists

def _working_dir(self) -> Optional[str]:
"""
Returns path or None
Expand Down
Loading