diff --git a/docs/docs/reference/environment-variables.md b/docs/docs/reference/environment-variables.md index 04ee67d93c..10bf723b3a 100644 --- a/docs/docs/reference/environment-variables.md +++ b/docs/docs/reference/environment-variables.md @@ -131,12 +131,7 @@ For more details on the options below, refer to the [server deployment](../guide - `DSTACK_SERVER_METRICS_FINISHED_TTL_SECONDS`{ #DSTACK_SERVER_METRICS_FINISHED_TTL_SECONDS } – Maximum age of metrics samples for finished jobs. - `DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS } – Maximum age of instance health checks. - `DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS } – Minimum time interval between consecutive health checks of the same instance. - - ??? info "Internal environment variables" The following environment variables are intended for development purposes: diff --git a/src/dstack/_internal/server/app.py b/src/dstack/_internal/server/app.py index 5382e8f113..736733b403 100644 --- a/src/dstack/_internal/server/app.py +++ b/src/dstack/_internal/server/app.py @@ -66,7 +66,7 @@ get_client_version, get_server_client_error_details, ) -from dstack._internal.settings import DSTACK_VERSION, FeatureFlags +from dstack._internal.settings import DSTACK_VERSION from dstack._internal.utils.logging import get_logger from dstack._internal.utils.ssh import check_required_ssh_version @@ -229,7 +229,7 @@ def register_routes(app: FastAPI, ui: bool = True): app.include_router(model_proxy.router, prefix="/proxy/models", tags=["model-proxy"]) app.include_router(prometheus.router) app.include_router(files.router) - app.include_router(events.root_router, include_in_schema=FeatureFlags.EVENTS) + app.include_router(events.root_router) @app.exception_handler(ForbiddenError) async def forbidden_error_handler(request: Request, exc: ForbiddenError): diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 2c1a42e857..85af7d3315 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -33,7 +33,6 @@ process_terminating_jobs, ) from dstack._internal.server.background.tasks.process_volumes import process_submitted_volumes -from dstack._internal.settings import FeatureFlags _scheduler = AsyncIOScheduler() @@ -71,8 +70,7 @@ def start_background_tasks() -> AsyncIOScheduler: _scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1)) _scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1) _scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1) - if FeatureFlags.EVENTS: - _scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1) + _scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1) if settings.ENABLE_PROMETHEUS_METRICS: _scheduler.add_job( collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1 diff --git a/src/dstack/_internal/server/routers/events.py b/src/dstack/_internal/server/routers/events.py index b5472aa0d3..3895767d6f 100644 --- a/src/dstack/_internal/server/routers/events.py +++ b/src/dstack/_internal/server/routers/events.py @@ -2,7 +2,6 @@ from sqlalchemy.ext.asyncio import AsyncSession import dstack._internal.server.services.events as events_services -from dstack._internal.core.errors import ServerClientError from dstack._internal.core.models.events import Event from dstack._internal.server.db import get_session from dstack._internal.server.models import UserModel @@ -12,7 +11,6 @@ CustomORJSONResponse, get_base_api_additional_responses, ) -from dstack._internal.settings import FeatureFlags root_router = APIRouter( prefix="/api/events", @@ -36,8 +34,6 @@ async def list_events( The results are paginated. To get the next page, pass `recorded_at` and `id` of the last event from the previous page as `prev_recorded_at` and `prev_id`. """ - if not FeatureFlags.EVENTS: - raise ServerClientError("Events are disabled on this server") return CustomORJSONResponse( await events_services.list_events( session=session, diff --git a/src/dstack/_internal/server/services/events.py b/src/dstack/_internal/server/services/events.py index 028570fb86..abd2bd9cfb 100644 --- a/src/dstack/_internal/server/services/events.py +++ b/src/dstack/_internal/server/services/events.py @@ -22,7 +22,6 @@ UserModel, ) from dstack._internal.server.services.logging import fmt_entity -from dstack._internal.settings import FeatureFlags from dstack._internal.utils.common import get_current_datetime from dstack._internal.utils.logging import get_logger @@ -170,9 +169,6 @@ def emit(session: AsyncSession, message: str, actor: AnyActor, targets: list[Tar they will see the entire event with all targets. If this is not desired, consider emitting multiple separate events instead. """ - if not FeatureFlags.EVENTS: - return - if not targets: raise ValueError("At least one target must be specified") if not message: diff --git a/src/dstack/_internal/server/testing/matchers.py b/src/dstack/_internal/server/testing/matchers.py new file mode 100644 index 0000000000..0cf610807e --- /dev/null +++ b/src/dstack/_internal/server/testing/matchers.py @@ -0,0 +1,31 @@ +import re +import uuid + + +class SomeUUID4Str: + """ + A matcher that compares equal to any valid UUID4 string + """ + + # Simplified UUID regex: just checks the 8-4-4-4-12 hex structure + _uuid_regex = re.compile( + r"^[0-9a-f]{8}-" + r"[0-9a-f]{4}-" + r"[0-9a-f]{4}-" + r"[0-9a-f]{4}-" + r"[0-9a-f]{12}$" + ) + + def __eq__(self, other): + if isinstance(other, str): + if not self._uuid_regex.match(other): + return False + try: + return uuid.UUID(other).version == 4 + except ValueError: + return False + + return False + + def __repr__(self): + return "SomeUUID4Str()" diff --git a/src/dstack/_internal/settings.py b/src/dstack/_internal/settings.py index bd0a36ba12..245681411d 100644 --- a/src/dstack/_internal/settings.py +++ b/src/dstack/_internal/settings.py @@ -38,6 +38,3 @@ class FeatureFlags: # DSTACK_FF_AUTOCREATED_FLEETS_ENABLED enables legacy autocreated fleets: # If there are no fleet suitable for the run, a new fleet is created automatically instead of an error. AUTOCREATED_FLEETS_ENABLED = os.getenv("DSTACK_FF_AUTOCREATED_FLEETS_ENABLED") is not None - - # Server-side flag to enable event emission and Events API - EVENTS = os.getenv("DSTACK_FF_EVENTS") is not None diff --git a/src/tests/_internal/server/background/tasks/test_process_events.py b/src/tests/_internal/server/background/tasks/test_process_events.py index 1dc29167da..899f2946e8 100644 --- a/src/tests/_internal/server/background/tasks/test_process_events.py +++ b/src/tests/_internal/server/background/tasks/test_process_events.py @@ -1,5 +1,4 @@ from datetime import datetime -from typing import Generator from unittest.mock import patch import pytest @@ -14,12 +13,6 @@ from dstack._internal.server.testing.common import create_user -@pytest.fixture(autouse=True) -def set_feature_flag() -> Generator[None, None, None]: - with patch("dstack._internal.settings.FeatureFlags.EVENTS", True): - yield - - @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_deletes_old_events(test_db, session: AsyncSession) -> None: diff --git a/src/tests/_internal/server/routers/test_events.py b/src/tests/_internal/server/routers/test_events.py index 8cdf386661..478474bca7 100644 --- a/src/tests/_internal/server/routers/test_events.py +++ b/src/tests/_internal/server/routers/test_events.py @@ -1,6 +1,5 @@ import uuid from datetime import datetime -from typing import Generator from unittest.mock import patch import pytest @@ -29,12 +28,6 @@ ] -@pytest.fixture(autouse=True) -def set_feature_flag() -> Generator[None, None, None]: - with patch("dstack._internal.settings.FeatureFlags.EVENTS", True): - yield - - class TestListEventsGeneral: async def test_response_format(self, session: AsyncSession, client: AsyncClient) -> None: user = await create_user(session=session, name="test_user") diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 43301aea69..c5b8b7079a 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -47,6 +47,7 @@ get_remote_connection_info, get_ssh_fleet_configuration, ) +from dstack._internal.server.testing.matchers import SomeUUID4Str pytestmark = pytest.mark.usefixtures("image_config_mock") @@ -321,16 +322,14 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async session=session, project=project, user=user, project_role=ProjectRole.USER ) spec = get_fleet_spec(conf=get_fleet_configuration()) - with patch("uuid.uuid4") as m: - m.return_value = UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e") - response = await client.post( - f"/api/project/{project.name}/fleets/apply", - headers=get_auth_headers(user.token), - json={"plan": {"spec": spec.dict()}, "force": False}, - ) + response = await client.post( + f"/api/project/{project.name}/fleets/apply", + headers=get_auth_headers(user.token), + json={"plan": {"spec": spec.dict()}, "force": False}, + ) assert response.status_code == 200 assert response.json() == { - "id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e", + "id": SomeUUID4Str(), "name": spec.configuration.name, "project_name": project.name, "spec": { @@ -390,10 +389,10 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async "status_message": None, "instances": [ { - "id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e", + "id": SomeUUID4Str(), "project_name": project.name, "name": f"{spec.configuration.name}-0", - "fleet_id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e", + "fleet_id": SomeUUID4Str(), "fleet_name": spec.configuration.name, "instance_num": 0, "job_name": None, @@ -413,6 +412,8 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async } ], } + for instance in response.json()["instances"]: + assert instance["fleet_id"] == response.json()["id"] res = await session.execute(select(FleetModel)) assert res.scalar_one() res = await session.execute(select(InstanceModel)) @@ -435,16 +436,14 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A network=None, ) spec = get_fleet_spec(conf=conf) - with patch("uuid.uuid4") as m: - m.return_value = UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e") - response = await client.post( - f"/api/project/{project.name}/fleets/apply", - headers=get_auth_headers(user.token), - json={"plan": {"spec": spec.dict()}, "force": False}, - ) + response = await client.post( + f"/api/project/{project.name}/fleets/apply", + headers=get_auth_headers(user.token), + json={"plan": {"spec": spec.dict()}, "force": False}, + ) assert response.status_code == 200, response.json() assert response.json() == { - "id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e", + "id": SomeUUID4Str(), "name": spec.configuration.name, "project_name": project.name, "spec": { @@ -512,7 +511,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A "status_message": None, "instances": [ { - "id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e", + "id": SomeUUID4Str(), "project_name": project.name, "backend": "remote", "instance_type": { @@ -528,7 +527,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A }, }, "name": f"{spec.configuration.name}-0", - "fleet_id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e", + "fleet_id": SomeUUID4Str(), "fleet_name": spec.configuration.name, "instance_num": 0, "job_name": None, @@ -546,6 +545,8 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A } ], } + for instance in response.json()["instances"]: + assert instance["fleet_id"] == response.json()["id"] res = await session.execute(select(FleetModel)) assert res.scalar_one() res = await session.execute(select(InstanceModel)) diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index a5a86868c6..77dada59af 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -64,6 +64,7 @@ get_job_provisioning_data, get_run_spec, ) +from dstack._internal.server.testing.matchers import SomeUUID4Str from tests._internal.server.background.tasks.test_process_running_jobs import settings pytestmark = pytest.mark.usefixtures("image_config_mock") @@ -301,8 +302,8 @@ def get_dev_env_run_plan_dict( def get_dev_env_run_dict( - run_id: str = "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e", - job_id: str = "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e", + run_id: Union[str, SomeUUID4Str] = "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e", + job_id: Union[str, SomeUUID4Str] = "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e", project_name: str = "test_project", username: str = "test_user", run_name: Optional[str] = "run_name", @@ -1418,14 +1419,13 @@ async def test_submits_new_run_if_no_current_resource( await add_project_member( session=session, project=project, user=user, project_role=ProjectRole.USER ) - run_id = UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e") submitted_at = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc) submitted_at_formatted = "2023-01-02T03:04:00+00:00" last_processed_at_formatted = submitted_at_formatted repo = await create_repo(session=session, project_id=project.id) run_dict = get_dev_env_run_dict( - run_id=str(run_id), - job_id=str(run_id), + run_id=SomeUUID4Str(), + job_id=SomeUUID4Str(), project_name=project.name, username=user.name, submitted_at=submitted_at_formatted, @@ -1434,11 +1434,7 @@ async def test_submits_new_run_if_no_current_resource( run_name="test-run", repo_id=repo.name, ) - with ( - patch("uuid.uuid4") as uuid_mock, - patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock, - ): - uuid_mock.return_value = run_id + with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: datetime_mock.return_value = submitted_at response = await client.post( f"/api/project/{project.name}/runs/apply", @@ -1604,14 +1600,13 @@ async def test_submits_run( await add_project_member( session=session, project=project, user=user, project_role=ProjectRole.USER ) - run_id = UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e") submitted_at = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc) submitted_at_formatted = "2023-01-02T03:04:00+00:00" last_processed_at_formatted = submitted_at_formatted repo = await create_repo(session=session, project_id=project.id) run_dict = get_dev_env_run_dict( - run_id=str(run_id), - job_id=str(run_id), + run_id=SomeUUID4Str(), + job_id=SomeUUID4Str(), project_name=project.name, username=user.name, submitted_at=submitted_at_formatted, @@ -1625,11 +1620,7 @@ async def test_submits_run( if privileged is None: del run_spec["configuration"]["privileged"] body = {"run_spec": run_spec} - with ( - patch("uuid.uuid4") as uuid_mock, - patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock, - ): - uuid_mock.return_value = run_id + with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: datetime_mock.return_value = submitted_at response = await client.post( f"/api/project/{project.name}/runs/submit", @@ -1655,14 +1646,13 @@ async def test_submits_run_docker_true( await add_project_member( session=session, project=project, user=user, project_role=ProjectRole.USER ) - run_id = UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e") submitted_at = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc) submitted_at_formatted = "2023-01-02T03:04:00+00:00" last_processed_at_formatted = submitted_at_formatted repo = await create_repo(session=session, project_id=project.id) run_dict = get_dev_env_run_dict( - run_id=str(run_id), - job_id=str(run_id), + run_id=SomeUUID4Str(), + job_id=SomeUUID4Str(), project_name=project.name, username=user.name, submitted_at=submitted_at_formatted, @@ -1674,11 +1664,7 @@ async def test_submits_run_docker_true( privileged=True, # docker=True automatically enables privileged mode ) body = {"run_spec": run_dict["run_spec"]} - with ( - patch("uuid.uuid4") as uuid_mock, - patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock, - ): - uuid_mock.return_value = run_id + with patch("dstack._internal.utils.common.get_current_datetime") as datetime_mock: datetime_mock.return_value = submitted_at response = await client.post( f"/api/project/{project.name}/runs/submit", @@ -1712,13 +1698,11 @@ async def test_submits_run_without_run_name( repo_id=repo.name, ) body = {"run_spec": run_dict["run_spec"]} - with patch("uuid.uuid4") as uuid_mock: - uuid_mock.return_value = UUID(run_dict["id"]) - response = await client.post( - f"/api/project/{project.name}/runs/submit", - headers=get_auth_headers(user.token), - json=body, - ) + response = await client.post( + f"/api/project/{project.name}/runs/submit", + headers=get_auth_headers(user.token), + json=body, + ) assert response.status_code == 200 assert response.json()["run_spec"]["run_name"] is not None res = await session.execute(select(RunModel))