Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions docs/docs/reference/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,7 @@ For more details on the options below, refer to the [server deployment](../guide
- `DSTACK_SERVER_METRICS_FINISHED_TTL_SECONDS`{ #DSTACK_SERVER_METRICS_FINISHED_TTL_SECONDS } – Maximum age of metrics samples for finished jobs.
- `DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS } – Maximum age of instance health checks.
- `DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS } – Minimum time interval between consecutive health checks of the same instance.

<!--
TODO: uncomment after dropping DSTACK_FF_EVENTS

- `DSTACK_SERVER_EVENTS_TTL_SECONDS` { #DSTACK_SERVER_EVENTS_TTL_SECONDS } - Maximum age of event records. Set to `0` to disable event storage. Defaults to 30 days.
-->

??? info "Internal environment variables"
The following environment variables are intended for development purposes:
Expand Down
4 changes: 2 additions & 2 deletions src/dstack/_internal/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
get_client_version,
get_server_client_error_details,
)
from dstack._internal.settings import DSTACK_VERSION, FeatureFlags
from dstack._internal.settings import DSTACK_VERSION
from dstack._internal.utils.logging import get_logger
from dstack._internal.utils.ssh import check_required_ssh_version

Expand Down Expand Up @@ -229,7 +229,7 @@ def register_routes(app: FastAPI, ui: bool = True):
app.include_router(model_proxy.router, prefix="/proxy/models", tags=["model-proxy"])
app.include_router(prometheus.router)
app.include_router(files.router)
app.include_router(events.root_router, include_in_schema=FeatureFlags.EVENTS)
app.include_router(events.root_router)

@app.exception_handler(ForbiddenError)
async def forbidden_error_handler(request: Request, exc: ForbiddenError):
Expand Down
4 changes: 1 addition & 3 deletions src/dstack/_internal/server/background/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
process_terminating_jobs,
)
from dstack._internal.server.background.tasks.process_volumes import process_submitted_volumes
from dstack._internal.settings import FeatureFlags

_scheduler = AsyncIOScheduler()

Expand Down Expand Up @@ -71,8 +70,7 @@ def start_background_tasks() -> AsyncIOScheduler:
_scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1))
_scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1)
_scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1)
if FeatureFlags.EVENTS:
_scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1)
_scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1)
if settings.ENABLE_PROMETHEUS_METRICS:
_scheduler.add_job(
collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1
Expand Down
4 changes: 0 additions & 4 deletions src/dstack/_internal/server/routers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from sqlalchemy.ext.asyncio import AsyncSession

import dstack._internal.server.services.events as events_services
from dstack._internal.core.errors import ServerClientError
from dstack._internal.core.models.events import Event
from dstack._internal.server.db import get_session
from dstack._internal.server.models import UserModel
Expand All @@ -12,7 +11,6 @@
CustomORJSONResponse,
get_base_api_additional_responses,
)
from dstack._internal.settings import FeatureFlags

root_router = APIRouter(
prefix="/api/events",
Expand All @@ -36,8 +34,6 @@ async def list_events(
The results are paginated. To get the next page, pass `recorded_at` and `id` of
the last event from the previous page as `prev_recorded_at` and `prev_id`.
"""
if not FeatureFlags.EVENTS:
raise ServerClientError("Events are disabled on this server")
return CustomORJSONResponse(
await events_services.list_events(
session=session,
Expand Down
4 changes: 0 additions & 4 deletions src/dstack/_internal/server/services/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
UserModel,
)
from dstack._internal.server.services.logging import fmt_entity
from dstack._internal.settings import FeatureFlags
from dstack._internal.utils.common import get_current_datetime
from dstack._internal.utils.logging import get_logger

Expand Down Expand Up @@ -170,9 +169,6 @@ def emit(session: AsyncSession, message: str, actor: AnyActor, targets: list[Tar
they will see the entire event with all targets. If this is not desired,
consider emitting multiple separate events instead.
"""
if not FeatureFlags.EVENTS:
return

if not targets:
raise ValueError("At least one target must be specified")
if not message:
Expand Down
31 changes: 31 additions & 0 deletions src/dstack/_internal/server/testing/matchers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import re
import uuid


class SomeUUID4Str:
"""
A matcher that compares equal to any valid UUID4 string
"""

# Simplified UUID regex: just checks the 8-4-4-4-12 hex structure
_uuid_regex = re.compile(
r"^[0-9a-f]{8}-"
r"[0-9a-f]{4}-"
r"[0-9a-f]{4}-"
r"[0-9a-f]{4}-"
r"[0-9a-f]{12}$"
)

def __eq__(self, other):
if isinstance(other, str):
if not self._uuid_regex.match(other):
return False
try:
return uuid.UUID(other).version == 4
except ValueError:
return False

return False

def __repr__(self):
return "SomeUUID4Str()"
3 changes: 0 additions & 3 deletions src/dstack/_internal/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,3 @@ class FeatureFlags:
# DSTACK_FF_AUTOCREATED_FLEETS_ENABLED enables legacy autocreated fleets:
# If there are no fleet suitable for the run, a new fleet is created automatically instead of an error.
AUTOCREATED_FLEETS_ENABLED = os.getenv("DSTACK_FF_AUTOCREATED_FLEETS_ENABLED") is not None

# Server-side flag to enable event emission and Events API
EVENTS = os.getenv("DSTACK_FF_EVENTS") is not None
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import datetime
from typing import Generator
from unittest.mock import patch

import pytest
Expand All @@ -14,12 +13,6 @@
from dstack._internal.server.testing.common import create_user


@pytest.fixture(autouse=True)
def set_feature_flag() -> Generator[None, None, None]:
with patch("dstack._internal.settings.FeatureFlags.EVENTS", True):
yield


@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_deletes_old_events(test_db, session: AsyncSession) -> None:
Expand Down
7 changes: 0 additions & 7 deletions src/tests/_internal/server/routers/test_events.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import uuid
from datetime import datetime
from typing import Generator
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -29,12 +28,6 @@
]


@pytest.fixture(autouse=True)
def set_feature_flag() -> Generator[None, None, None]:
with patch("dstack._internal.settings.FeatureFlags.EVENTS", True):
yield


class TestListEventsGeneral:
async def test_response_format(self, session: AsyncSession, client: AsyncClient) -> None:
user = await create_user(session=session, name="test_user")
Expand Down
41 changes: 21 additions & 20 deletions src/tests/_internal/server/routers/test_fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
get_remote_connection_info,
get_ssh_fleet_configuration,
)
from dstack._internal.server.testing.matchers import SomeUUID4Str

pytestmark = pytest.mark.usefixtures("image_config_mock")

Expand Down Expand Up @@ -321,16 +322,14 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async
session=session, project=project, user=user, project_role=ProjectRole.USER
)
spec = get_fleet_spec(conf=get_fleet_configuration())
with patch("uuid.uuid4") as m:
m.return_value = UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e")
response = await client.post(
f"/api/project/{project.name}/fleets/apply",
headers=get_auth_headers(user.token),
json={"plan": {"spec": spec.dict()}, "force": False},
)
response = await client.post(
f"/api/project/{project.name}/fleets/apply",
headers=get_auth_headers(user.token),
json={"plan": {"spec": spec.dict()}, "force": False},
)
assert response.status_code == 200
assert response.json() == {
"id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e",
"id": SomeUUID4Str(),
"name": spec.configuration.name,
"project_name": project.name,
"spec": {
Expand Down Expand Up @@ -390,10 +389,10 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async
"status_message": None,
"instances": [
{
"id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e",
"id": SomeUUID4Str(),
"project_name": project.name,
"name": f"{spec.configuration.name}-0",
"fleet_id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e",
"fleet_id": SomeUUID4Str(),
"fleet_name": spec.configuration.name,
"instance_num": 0,
"job_name": None,
Expand All @@ -413,6 +412,8 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async
}
],
}
for instance in response.json()["instances"]:
assert instance["fleet_id"] == response.json()["id"]
res = await session.execute(select(FleetModel))
assert res.scalar_one()
res = await session.execute(select(InstanceModel))
Expand All @@ -435,16 +436,14 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A
network=None,
)
spec = get_fleet_spec(conf=conf)
with patch("uuid.uuid4") as m:
m.return_value = UUID("1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e")
response = await client.post(
f"/api/project/{project.name}/fleets/apply",
headers=get_auth_headers(user.token),
json={"plan": {"spec": spec.dict()}, "force": False},
)
response = await client.post(
f"/api/project/{project.name}/fleets/apply",
headers=get_auth_headers(user.token),
json={"plan": {"spec": spec.dict()}, "force": False},
)
assert response.status_code == 200, response.json()
assert response.json() == {
"id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e",
"id": SomeUUID4Str(),
"name": spec.configuration.name,
"project_name": project.name,
"spec": {
Expand Down Expand Up @@ -512,7 +511,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A
"status_message": None,
"instances": [
{
"id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e",
"id": SomeUUID4Str(),
"project_name": project.name,
"backend": "remote",
"instance_type": {
Expand All @@ -528,7 +527,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A
},
},
"name": f"{spec.configuration.name}-0",
"fleet_id": "1b0e1b45-2f8c-4ab6-8010-a0d1a3e44e0e",
"fleet_id": SomeUUID4Str(),
"fleet_name": spec.configuration.name,
"instance_num": 0,
"job_name": None,
Expand All @@ -546,6 +545,8 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A
}
],
}
for instance in response.json()["instances"]:
assert instance["fleet_id"] == response.json()["id"]
res = await session.execute(select(FleetModel))
assert res.scalar_one()
res = await session.execute(select(InstanceModel))
Expand Down
Loading