diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index efd29636a5..774f244543 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,6 +10,12 @@ repos: rev: v2.6.2 # Should match .github/workflows/build-artifacts.yml hooks: - id: golangci-lint-full + alias: runner-fix + language_version: 1.25.0 # Should match runner/go.mod + entry: bash -c 'cd runner && golangci-lint run --fix' + stages: [manual] + - id: golangci-lint-full + alias: runner-lint language_version: 1.25.0 # Should match runner/go.mod entry: bash -c 'cd runner && golangci-lint run' stages: [manual] diff --git a/docs/docs/reference/dstack.yml/dev-environment.md b/docs/docs/reference/dstack.yml/dev-environment.md index 364297e431..0c02374ee3 100644 --- a/docs/docs/reference/dstack.yml/dev-environment.md +++ b/docs/docs/reference/dstack.yml/dev-environment.md @@ -112,6 +112,17 @@ The `dev-environment` configuration type allows running [dev environments](../.. type: required: true +??? info "`if_exists` action" + + If the `path` already exists and is a non-empty directory, by default the run is terminated with an error. + This can be changed with the `if_exists` option: + + * `error` – do not try to check out, terminate the run with an error (the default action since `0.20.0`) + * `skip` – do not try to check out, skip the repo (the only action available before `0.20.0`) + + Note, if the `path` exists and is _not_ a directory (e.g., a regular file), this is always an error that + cannot be ignored with the `skip` action. + ??? info "Short syntax" The short syntax for repos is a colon-separated string in the form of `local_path_or_url:path`. diff --git a/docs/docs/reference/dstack.yml/service.md b/docs/docs/reference/dstack.yml/service.md index 9c9a34ee0e..601a88291f 100644 --- a/docs/docs/reference/dstack.yml/service.md +++ b/docs/docs/reference/dstack.yml/service.md @@ -227,6 +227,17 @@ The `service` configuration type allows running [services](../../concepts/servic type: required: true +??? info "`if_exists` action" + + If the `path` already exists and is a non-empty directory, by default the run is terminated with an error. + This can be changed with the `if_exists` option: + + * `error` – do not try to check out, terminate the run with an error (the default action since `0.20.0`) + * `skip` – do not try to check out, skip the repo (the only action available before `0.20.0`) + + Note, if the `path` exists and is _not_ a directory (e.g., a regular file), this is always an error that + cannot be ignored with the `skip` action. + ??? info "Short syntax" The short syntax for repos is a colon-separated string in the form of `local_path_or_url:path`. diff --git a/docs/docs/reference/dstack.yml/task.md b/docs/docs/reference/dstack.yml/task.md index 3563833196..01bc7580b2 100644 --- a/docs/docs/reference/dstack.yml/task.md +++ b/docs/docs/reference/dstack.yml/task.md @@ -112,6 +112,17 @@ The `task` configuration type allows running [tasks](../../concepts/tasks.md). type: required: true +??? info "`if_exists` action" + + If the `path` already exists and is a non-empty directory, by default the run is terminated with an error. + This can be changed with the `if_exists` option: + + * `error` – do not try to check out, terminate the run with an error (the default action since `0.20.0`) + * `skip` – do not try to check out, skip the repo (the only action available before `0.20.0`) + + Note, if the `path` exists and is _not_ a directory (e.g., a regular file), this is always an error that + cannot be ignored with the `skip` action. + ??? info "Short syntax" The short syntax for repos is a colon-separated string in the form of `local_path_or_url:path`. diff --git a/runner/internal/executor/repo.go b/runner/internal/executor/repo.go index 5203190c14..60a52293fe 100644 --- a/runner/internal/executor/repo.go +++ b/runner/internal/executor/repo.go @@ -13,6 +13,7 @@ import ( "github.com/dstackai/dstack/runner/internal/common" "github.com/dstackai/dstack/runner/internal/log" "github.com/dstackai/dstack/runner/internal/repo" + "github.com/dstackai/dstack/runner/internal/schemas" ) // setupRepo must be called from Run @@ -36,13 +37,27 @@ func (ex *RunExecutor) setupRepo(ctx context.Context) error { } log.Trace(ctx, "Job repo dir", "path", ex.repoDir) - shouldCheckout, err := ex.shouldCheckout(ctx) + repoDirIsEmpty, err := ex.prepareRepoDir(ctx) if err != nil { - return fmt.Errorf("check if checkout needed: %w", err) - } - if !shouldCheckout { - log.Info(ctx, "skipping repo checkout: repo dir is not empty") - return nil + return fmt.Errorf("prepare repo dir: %w", err) + } + if !repoDirIsEmpty { + var repoExistsAction schemas.RepoExistsAction + if ex.jobSpec.RepoExistsAction != nil { + repoExistsAction = *ex.jobSpec.RepoExistsAction + } else { + log.Debug(ctx, "repo_exists_action is not set, using legacy 'skip' action") + repoExistsAction = schemas.RepoExistsActionSkip + } + switch repoExistsAction { + case schemas.RepoExistsActionError: + return fmt.Errorf("setup repo: repo dir is not empty: %s", ex.repoDir) + case schemas.RepoExistsActionSkip: + log.Info(ctx, "Skipping repo checkout: repo dir is not empty", "path", ex.repoDir) + return nil + default: + return fmt.Errorf("setup repo: unsupported action: %s", repoExistsAction) + } } // Move existing repo files from the repo dir and back to be able to git clone. // Currently, only needed for volumes mounted inside repo with lost+found present. @@ -143,11 +158,11 @@ func (ex *RunExecutor) prepareArchive(ctx context.Context) error { return nil } -func (ex *RunExecutor) shouldCheckout(ctx context.Context) (bool, error) { - log.Trace(ctx, "checking if repo checkout is needed") +func (ex *RunExecutor) prepareRepoDir(ctx context.Context) (bool, error) { + log.Trace(ctx, "Preparing repo dir") info, err := os.Stat(ex.repoDir) if err != nil { - if os.IsNotExist(err) { + if errors.Is(err, os.ErrNotExist) { if err = common.MkdirAll(ctx, ex.repoDir, ex.jobUid, ex.jobGid); err != nil { return false, fmt.Errorf("create repo dir: %w", err) } @@ -157,24 +172,22 @@ func (ex *RunExecutor) shouldCheckout(ctx context.Context) (bool, error) { return false, fmt.Errorf("stat repo dir: %w", err) } if !info.IsDir() { - return false, fmt.Errorf("failed to set up repo dir: %s is not a dir", ex.repoDir) + return false, fmt.Errorf("stat repo dir: %s is not a dir", ex.repoDir) } entries, err := os.ReadDir(ex.repoDir) if err != nil { return false, fmt.Errorf("read repo dir: %w", err) } if len(entries) == 0 { - // Repo dir existed but was empty, e.g. a volume without repo + // Repo dir is empty return true, nil } - if len(entries) > 1 { - // Repo already checked out, e.g. a volume with repo - return false, nil - } - if entries[0].Name() == "lost+found" { + if len(entries) == 1 && entries[0].Name() == "lost+found" { // lost+found may be present on a newly created volume + // We (but not Git, see `{move,restore}RepoDir`) consider such a dir "empty" return true, nil } + // Repo dir is not empty return false, nil } diff --git a/runner/internal/schemas/schemas.go b/runner/internal/schemas/schemas.go index 4a92702eeb..106bc61f87 100644 --- a/runner/internal/schemas/schemas.go +++ b/runner/internal/schemas/schemas.go @@ -71,8 +71,9 @@ type JobSpec struct { // `RepoData` is optional for compatibility with jobs submitted before 0.19.17. // Use `RunExecutor.getRepoData()` to get non-nil `RepoData`. // TODO: make required when supporting jobs submitted before 0.19.17 is no longer relevant. - RepoData *RepoData `json:"repo_data"` - FileArchives []FileArchive `json:"file_archives"` + RepoData *RepoData `json:"repo_data"` + RepoExistsAction *RepoExistsAction `json:"repo_exists_action"` + FileArchives []FileArchive `json:"file_archives"` } type ClusterInfo struct { @@ -102,6 +103,13 @@ type RepoData struct { RepoConfigEmail string `json:"repo_config_email"` } +type RepoExistsAction string + +const ( + RepoExistsActionError RepoExistsAction = "error" + RepoExistsActionSkip RepoExistsAction = "skip" +) + type FileArchive struct { Id string `json:"id"` Path string `json:"path"` diff --git a/scripts/docs/gen_schema_reference.py b/scripts/docs/gen_schema_reference.py index acd4e722d8..e7c2cd62e6 100644 --- a/scripts/docs/gen_schema_reference.py +++ b/scripts/docs/gen_schema_reference.py @@ -6,6 +6,7 @@ import inspect import logging import re +from enum import Enum from fnmatch import fnmatch import mkdocs_gen_files @@ -63,11 +64,14 @@ def generate_schema_reference( ] ) for name, field in cls.__fields__.items(): + default = field.default + if isinstance(default, Enum): + default = default.value values = dict( name=name, description=field.field_info.description, type=get_type(field.annotation), - default=field.default, + default=default, required=field.required, ) # TODO: If the field doesn't have description (e.g. BaseConfiguration.type), we could fallback to docstring diff --git a/src/dstack/_internal/core/compatibility/runs.py b/src/dstack/_internal/core/compatibility/runs.py index e0f9989ca2..0cf9cfb9e2 100644 --- a/src/dstack/_internal/core/compatibility/runs.py +++ b/src/dstack/_internal/core/compatibility/runs.py @@ -178,7 +178,6 @@ def get_run_spec_excludes(run_spec: RunSpec) -> IncludeExcludeDictType: configuration_excludes["schedule"] = True if profile is not None and profile.schedule is None: profile_excludes.add("schedule") - configuration_excludes["repos"] = True if configuration_excludes: spec_excludes["configuration"] = configuration_excludes diff --git a/src/dstack/_internal/core/models/configurations.py b/src/dstack/_internal/core/models/configurations.py index 6fe8132de9..60edd51c38 100644 --- a/src/dstack/_internal/core/models/configurations.py +++ b/src/dstack/_internal/core/models/configurations.py @@ -31,7 +31,7 @@ from dstack._internal.core.models.services import AnyModel, OpenAIChatModel from dstack._internal.core.models.unix import UnixUser from dstack._internal.core.models.volumes import MountPoint, VolumeConfiguration, parse_mount_point -from dstack._internal.utils.common import has_duplicates +from dstack._internal.utils.common import has_duplicates, list_enum_values_for_annotation from dstack._internal.utils.json_schema import add_extra_schema_types from dstack._internal.utils.json_utils import ( pydantic_orjson_dumps_with_indent, @@ -95,6 +95,13 @@ def parse(cls, v: str) -> "PortMapping": return PortMapping(local_port=local_port, container_port=int(container_port)) +class RepoExistsAction(str, Enum): + # Don't try to check out, terminate the run with an error (the default action since 0.20.0) + ERROR = "error" + # Don't try to check out, skip the repo (the logic hardcoded in the pre-0.20.0 runner) + SKIP = "skip" + + class RepoSpec(CoreModel): local_path: Annotated[ Optional[str], @@ -132,6 +139,15 @@ class RepoSpec(CoreModel): ) ), ] = None + if_exists: Annotated[ + RepoExistsAction, + Field( + description=( + "The action to be taken if `path` exists and is not empty." + f" One of: {list_enum_values_for_annotation(RepoExistsAction)}" + ), + ), + ] = RepoExistsAction.ERROR @classmethod def parse(cls, v: str) -> Self: diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 5e6d4b4806..ec657e7987 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -21,6 +21,7 @@ AnyRunConfiguration, HTTPHeaderSpec, HTTPMethod, + RepoExistsAction, RunConfiguration, ServiceConfiguration, ) @@ -281,6 +282,8 @@ class JobSpec(CoreModel): repo_code_hash: Optional[str] = None # `repo_dir` was added in 0.19.27. Default value is set for backward compatibility repo_dir: str = LEGACY_REPO_DIR + # None for jobs without repo and any jobs submitted by pre-0.20.0 clients + repo_exists_action: Optional[RepoExistsAction] = None file_archives: list[FileArchiveMapping] = [] # None for non-services and pre-0.19.19 services. See `get_service_port` service_port: Optional[int] = None diff --git a/src/dstack/_internal/server/schemas/runner.py b/src/dstack/_internal/server/schemas/runner.py index f88cf47a82..f3c3614b58 100644 --- a/src/dstack/_internal/server/schemas/runner.py +++ b/src/dstack/_internal/server/schemas/runner.py @@ -80,6 +80,7 @@ class SubmitBody(CoreModel): "working_dir", "repo_dir", "repo_data", + "repo_exists_action", "file_archives", } ), diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py index 69e8f898c6..4cc2c9079e 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/base.py +++ b/src/dstack/_internal/server/services/jobs/configurators/base.py @@ -20,6 +20,7 @@ PortMapping, ProbeConfig, PythonVersion, + RepoExistsAction, RunConfigurationType, ServiceConfiguration, ) @@ -169,6 +170,7 @@ async def _get_job_spec( repo_data=self.run_spec.repo_data, repo_code_hash=self.run_spec.repo_code_hash, repo_dir=self._repo_dir(), + repo_exists_action=self._repo_exists_action(), file_archives=self.run_spec.file_archives, service_port=self._service_port(), probes=self._probes(), @@ -320,6 +322,17 @@ def _repo_dir(self) -> str: return LEGACY_REPO_DIR return repo_dir + def _repo_exists_action(self) -> Optional[RepoExistsAction]: + if not (repos := self.run_spec.configuration.repos): + # One of: + # - The configuration without repo submitted by any client. + # - The configuration _with_ repo submitted by pre-0.20.0 client (the `repos` option + # is always excluded by pre-0.20.0 clients for compatibility with pre-0.20.0 servers) + # In either case, we return None, and runner falls back to "skip" action if needed + # (the second case, the only action hardcoded in pre-0.20.0 runners) + return None + return repos[0].if_exists + def _working_dir(self) -> Optional[str]: """ Returns path or None diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index dc27ef107c..bdca700e9a 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -161,7 +161,16 @@ def get_dev_env_run_plan_dict( "shm_size": None, }, "volumes": [json.loads(v.json()) for v in volumes], - "repos": [], + "repos": [ + { + "url": "https://github.com/dstackai/dstack", + "branch": None, + "hash": None, + "local_path": None, + "path": "~/repo", + "if_exists": "error", + }, + ], "files": [], "backends": ["local", "aws", "azure", "gcp", "lambda", "runpod"], "regions": ["us"], @@ -209,7 +218,14 @@ def get_dev_env_run_plan_dict( "tags": None, }, "repo_code_hash": None, - "repo_data": {"repo_dir": "/repo", "repo_type": "local"}, + "repo_data": { + "repo_type": "remote", + "repo_name": "dstack", + "repo_branch": None, + "repo_hash": None, + "repo_config_name": None, + "repo_config_email": None, + }, "repo_id": repo_id, "repo_dir": "~/repo", "run_name": run_name, @@ -258,8 +274,16 @@ def get_dev_env_run_plan_dict( "ssh_key": None, "working_dir": None, "repo_code_hash": None, - "repo_data": {"repo_dir": "/repo", "repo_type": "local"}, + "repo_data": { + "repo_type": "remote", + "repo_name": "dstack", + "repo_branch": None, + "repo_hash": None, + "repo_config_name": None, + "repo_config_email": None, + }, "repo_dir": "~/repo", + "repo_exists_action": "error", "file_archives": [], "service_port": None, "probes": [], @@ -375,7 +399,16 @@ def get_dev_env_run_dict( "shm_size": None, }, "volumes": [], - "repos": [], + "repos": [ + { + "url": "https://github.com/dstackai/dstack", + "branch": None, + "hash": None, + "local_path": None, + "path": "~/repo", + "if_exists": "error", + }, + ], "files": [], "backends": ["local", "aws", "azure", "gcp", "lambda"], "regions": ["us"], @@ -423,7 +456,14 @@ def get_dev_env_run_dict( "tags": None, }, "repo_code_hash": None, - "repo_data": {"repo_dir": "/repo", "repo_type": "local"}, + "repo_data": { + "repo_type": "remote", + "repo_name": "dstack", + "repo_branch": None, + "repo_hash": None, + "repo_config_name": None, + "repo_config_email": None, + }, "repo_id": repo_id, "repo_dir": "~/repo", "run_name": run_name, @@ -467,8 +507,16 @@ def get_dev_env_run_dict( "ssh_key": None, "working_dir": None, "repo_code_hash": None, - "repo_data": {"repo_dir": "/repo", "repo_type": "local"}, + "repo_data": { + "repo_type": "remote", + "repo_name": "dstack", + "repo_branch": None, + "repo_hash": None, + "repo_config_name": None, + "repo_config_email": None, + }, "repo_dir": "~/repo", + "repo_exists_action": "error", "file_archives": [], "service_port": None, "probes": [], @@ -535,6 +583,16 @@ def get_service_run_spec( "port": 8000, "gateway": gateway, "model": "test-model", + "repos": [ + { + "url": "https://github.com/dstackai/dstack", + "branch": None, + "hash": None, + "local_path": None, + "path": "~/repo", + "if_exists": "error", + }, + ], }, "configuration_path": "dstack.yaml", "file_archives": [], @@ -542,8 +600,16 @@ def get_service_run_spec( "name": "string", }, "repo_code_hash": None, - "repo_data": {"repo_dir": "/repo", "repo_type": "local"}, + "repo_data": { + "repo_type": "remote", + "repo_name": "dstack", + "repo_branch": None, + "repo_hash": None, + "repo_config_name": None, + "repo_config_email": None, + }, "repo_id": repo_id, + "repo_dir": "~/repo", "run_name": run_name, "ssh_key_pub": "ssh_key", "working_dir": None, @@ -957,10 +1023,9 @@ async def test_returns_403_if_not_project_member( assert response.status_code == 403 @pytest.mark.asyncio - @pytest.mark.parametrize("privileged", [False]) @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_returns_run_plan_privileged_false( - self, test_db, session: AsyncSession, client: AsyncClient, privileged: bool + self, test_db, session: AsyncSession, client: AsyncClient ): user = await create_user(session=session, global_role=GlobalRole.USER) project = await create_project(session=session, owner=user) @@ -995,12 +1060,9 @@ async def test_returns_run_plan_privileged_false( offers=[offer_aws, offer_runpod], total_offers=2, max_price=2.0, - privileged=privileged, + privileged=False, ) - run_spec = copy.deepcopy(run_plan_dict["run_spec"]) - if privileged is None: - del run_spec["configuration"]["privileged"] - body = {"run_spec": run_spec} + body = {"run_spec": run_plan_dict["run_spec"]} with patch("dstack._internal.server.services.backends.get_project_backends") as m: backend_mock_aws = Mock() backend_mock_aws.TYPE = BackendType.AWS