From c747a568d924c4af7a95a6fb4bff6a23ddc8e0fb Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Wed, 26 Nov 2025 01:27:23 +0100 Subject: [PATCH 1/3] Events emission framework and API - Add the framework for event emission - Add the events API - Add a few sample events about project / user / fleet / instance / run / job creation. --- src/dstack/_internal/core/models/events.py | 33 + src/dstack/_internal/server/app.py | 2 + .../_internal/server/background/__init__.py | 2 + .../server/background/tasks/process_events.py | 17 + ...7d4a29cd38_add_events_and_event_targets.py | 91 ++ src/dstack/_internal/server/models.py | 40 + src/dstack/_internal/server/routers/events.py | 54 + src/dstack/_internal/server/routers/users.py | 1 + src/dstack/_internal/server/schemas/events.py | 30 + .../_internal/server/services/events.py | 327 +++++ .../_internal/server/services/fleets.py | 14 + .../_internal/server/services/projects.py | 8 +- .../server/services/runs/__init__.py | 16 +- src/dstack/_internal/server/services/users.py | 8 + src/dstack/_internal/server/settings.py | 3 + .../background/tasks/test_process_events.py | 49 + .../_internal/server/routers/test_events.py | 1135 +++++++++++++++++ 17 files changed, 1828 insertions(+), 2 deletions(-) create mode 100644 src/dstack/_internal/core/models/events.py create mode 100644 src/dstack/_internal/server/background/tasks/process_events.py create mode 100644 src/dstack/_internal/server/migrations/versions/f27d4a29cd38_add_events_and_event_targets.py create mode 100644 src/dstack/_internal/server/routers/events.py create mode 100644 src/dstack/_internal/server/schemas/events.py create mode 100644 src/dstack/_internal/server/services/events.py create mode 100644 src/tests/_internal/server/background/tasks/test_process_events.py create mode 100644 src/tests/_internal/server/routers/test_events.py diff --git a/src/dstack/_internal/core/models/events.py b/src/dstack/_internal/core/models/events.py new file mode 100644 index 0000000000..fce57e87fb --- /dev/null +++ b/src/dstack/_internal/core/models/events.py @@ -0,0 +1,33 @@ +# TODO: docs + +import uuid +from datetime import datetime +from enum import Enum +from typing import Optional + +from dstack._internal.core.models.common import CoreModel + + +class EventTargetType(str, Enum): + PROJECT = "project" + USER = "user" + FLEET = "fleet" + INSTANCE = "instance" + RUN = "run" + JOB = "job" + + +class EventTarget(CoreModel): + type: str # Holds EventTargetType; str for adding new types without breaking compatibility + project_id: Optional[uuid.UUID] + id: uuid.UUID + name: str + + +class Event(CoreModel): + id: uuid.UUID + message: str + recorded_at: datetime + actor_user_id: Optional[uuid.UUID] + actor_user: Optional[str] + targets: list[EventTarget] diff --git a/src/dstack/_internal/server/app.py b/src/dstack/_internal/server/app.py index d9b906bc66..736733b403 100644 --- a/src/dstack/_internal/server/app.py +++ b/src/dstack/_internal/server/app.py @@ -26,6 +26,7 @@ from dstack._internal.server.db import get_db, get_session_ctx, migrate from dstack._internal.server.routers import ( backends, + events, files, fleets, gateways, @@ -228,6 +229,7 @@ def register_routes(app: FastAPI, ui: bool = True): app.include_router(model_proxy.router, prefix="/proxy/models", tags=["model-proxy"]) app.include_router(prometheus.router) app.include_router(files.router) + app.include_router(events.root_router) @app.exception_handler(ForbiddenError) async def forbidden_error_handler(request: Request, exc: ForbiddenError): diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index df7d41b9d9..85af7d3315 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -3,6 +3,7 @@ from dstack._internal.server import settings from dstack._internal.server.background.tasks.process_compute_groups import process_compute_groups +from dstack._internal.server.background.tasks.process_events import delete_events from dstack._internal.server.background.tasks.process_fleets import process_fleets from dstack._internal.server.background.tasks.process_gateways import ( process_gateways, @@ -69,6 +70,7 @@ def start_background_tasks() -> AsyncIOScheduler: _scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1)) _scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1) _scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1) + _scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1) if settings.ENABLE_PROMETHEUS_METRICS: _scheduler.add_job( collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1 diff --git a/src/dstack/_internal/server/background/tasks/process_events.py b/src/dstack/_internal/server/background/tasks/process_events.py new file mode 100644 index 0000000000..22df5bcf33 --- /dev/null +++ b/src/dstack/_internal/server/background/tasks/process_events.py @@ -0,0 +1,17 @@ +from datetime import timedelta + +from sqlalchemy import delete + +from dstack._internal.server import settings +from dstack._internal.server.db import get_session_ctx +from dstack._internal.server.models import EventModel +from dstack._internal.server.utils import sentry_utils +from dstack._internal.utils.common import get_current_datetime + + +@sentry_utils.instrument_background_task +async def delete_events(): + cutoff = get_current_datetime() - timedelta(seconds=settings.SERVER_EVENTS_TTL_SECONDS) + stmt = delete(EventModel).where(EventModel.recorded_at < cutoff) + async with get_session_ctx() as session: + await session.execute(stmt) diff --git a/src/dstack/_internal/server/migrations/versions/f27d4a29cd38_add_events_and_event_targets.py b/src/dstack/_internal/server/migrations/versions/f27d4a29cd38_add_events_and_event_targets.py new file mode 100644 index 0000000000..38d41ab4d6 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/f27d4a29cd38_add_events_and_event_targets.py @@ -0,0 +1,91 @@ +"""Add events and event_targets + +Revision ID: f27d4a29cd38 +Revises: 7d1ec2b920ac +Create Date: 2025-11-26 01:01:46.305815 + +""" + +import sqlalchemy as sa +import sqlalchemy_utils +from alembic import op + +import dstack._internal.server.models + +# revision identifiers, used by Alembic. +revision = "f27d4a29cd38" +down_revision = "7d1ec2b920ac" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "events", + sa.Column("id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False), + sa.Column("message", sa.Text(), nullable=False), + sa.Column("recorded_at", dstack._internal.server.models.NaiveDateTime(), nullable=False), + sa.Column( + "actor_user_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=True + ), + sa.ForeignKeyConstraint( + ["actor_user_id"], + ["users.id"], + name=op.f("fk_events_actor_user_id_users"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_events")), + ) + with op.batch_alter_table("events", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_events_actor_user_id"), ["actor_user_id"], unique=False + ) + batch_op.create_index(batch_op.f("ix_events_recorded_at"), ["recorded_at"], unique=False) + + op.create_table( + "event_targets", + sa.Column("id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False), + sa.Column("event_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False), + sa.Column( + "entity_project_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=True + ), + sa.Column("entity_type", sa.String(length=100), nullable=False), + sa.Column("entity_id", sqlalchemy_utils.types.uuid.UUIDType(binary=False), nullable=False), + sa.Column("entity_name", sa.String(length=200), nullable=False), + sa.ForeignKeyConstraint( + ["entity_project_id"], + ["projects.id"], + name=op.f("fk_event_targets_entity_project_id_projects"), + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["event_id"], + ["events.id"], + name=op.f("fk_event_targets_event_id_events"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_event_targets")), + ) + with op.batch_alter_table("event_targets", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_event_targets_entity_project_id"), ["entity_project_id"], unique=False + ) + batch_op.create_index(batch_op.f("ix_event_targets_event_id"), ["event_id"], unique=False) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("event_targets", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_event_targets_event_id")) + batch_op.drop_index(batch_op.f("ix_event_targets_entity_project_id")) + + op.drop_table("event_targets") + with op.batch_alter_table("events", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_events_recorded_at")) + batch_op.drop_index(batch_op.f("ix_events_actor_user_id")) + + op.drop_table("events") + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index e88f83d599..e88d3ce55e 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -26,6 +26,7 @@ from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import CoreConfig, generate_dual_core_model from dstack._internal.core.models.compute_groups import ComputeGroupStatus +from dstack._internal.core.models.events import EventTargetType from dstack._internal.core.models.fleets import FleetStatus from dstack._internal.core.models.gateways import GatewayStatus from dstack._internal.core.models.health import HealthStatus @@ -847,3 +848,42 @@ class SecretModel(BaseModel): name: Mapped[str] = mapped_column(String(200)) value: Mapped[DecryptedString] = mapped_column(EncryptedString()) + + +class EventModel(BaseModel): + __tablename__ = "events" + + id: Mapped[uuid.UUID] = mapped_column( + UUIDType(binary=False), primary_key=True, default=uuid.uuid4 + ) + message: Mapped[str] = mapped_column(Text) + recorded_at: Mapped[datetime] = mapped_column(NaiveDateTime, index=True) + + actor_user_id: Mapped[Optional[uuid.UUID]] = mapped_column( + ForeignKey("users.id", ondelete="CASCADE"), nullable=True, index=True + ) + user: Mapped[Optional["UserModel"]] = relationship() + + targets: Mapped[List["EventTargetModel"]] = relationship(back_populates="event") + + +class EventTargetModel(BaseModel): + __tablename__ = "event_targets" + + id: Mapped[uuid.UUID] = mapped_column( + UUIDType(binary=False), primary_key=True, default=uuid.uuid4 + ) + + event_id: Mapped[uuid.UUID] = mapped_column( + ForeignKey("events.id", ondelete="CASCADE"), index=True + ) + event: Mapped["EventModel"] = relationship() + + entity_project_id: Mapped[Optional[uuid.UUID]] = mapped_column( + ForeignKey("projects.id", ondelete="CASCADE"), nullable=True, index=True + ) + entity_project: Mapped[Optional["ProjectModel"]] = relationship() + + entity_type: Mapped[EventTargetType] = mapped_column(EnumAsString(EventTargetType, 100)) + entity_id: Mapped[uuid.UUID] = mapped_column(UUIDType(binary=False)) + entity_name: Mapped[str] = mapped_column(String(200)) diff --git a/src/dstack/_internal/server/routers/events.py b/src/dstack/_internal/server/routers/events.py new file mode 100644 index 0000000000..e4a55e461b --- /dev/null +++ b/src/dstack/_internal/server/routers/events.py @@ -0,0 +1,54 @@ +from fastapi import APIRouter, Depends +from sqlalchemy.ext.asyncio import AsyncSession + +import dstack._internal.server.services.events as events_services +from dstack._internal.core.models.events import Event +from dstack._internal.server.db import get_session +from dstack._internal.server.models import UserModel +from dstack._internal.server.schemas.events import ListEventsRequest +from dstack._internal.server.security.permissions import Authenticated +from dstack._internal.server.utils.routers import ( + CustomORJSONResponse, + get_base_api_additional_responses, +) + +root_router = APIRouter( + prefix="/api/events", + tags=["events"], + responses=get_base_api_additional_responses(), +) + + +@root_router.post("/list", response_model=list[Event]) +async def list_events( + body: ListEventsRequest, + session: AsyncSession = Depends(get_session), + user: UserModel = Depends(Authenticated()), +): + """ + Returns events visible to the current user. + + The results are paginated. To get the next page, pass `recorded_at` and `id` of + the last event from the previous page as `prev_recorded_at` and `prev_id`. + """ + return CustomORJSONResponse( + await events_services.list_events( + session=session, + user=user, + target_projects=body.target_projects, + target_users=body.target_users, + target_fleets=body.target_fleets, + target_instances=body.target_instances, + target_runs=body.target_runs, + target_jobs=body.target_jobs, + within_projects=body.within_projects, + within_fleets=body.within_fleets, + within_runs=body.within_runs, + include_target_types=body.include_target_types, + actors=body.actors, + prev_recorded_at=body.prev_recorded_at, + prev_id=body.prev_id, + limit=body.limit, + ascending=body.ascending, + ) + ) diff --git a/src/dstack/_internal/server/routers/users.py b/src/dstack/_internal/server/routers/users.py index d58472275d..be04f83929 100644 --- a/src/dstack/_internal/server/routers/users.py +++ b/src/dstack/_internal/server/routers/users.py @@ -76,6 +76,7 @@ async def create_user( global_role=body.global_role, email=body.email, active=body.active, + creator=user, ) return CustomORJSONResponse(users.user_model_to_user(res)) diff --git a/src/dstack/_internal/server/schemas/events.py b/src/dstack/_internal/server/schemas/events.py new file mode 100644 index 0000000000..fcd07dbba0 --- /dev/null +++ b/src/dstack/_internal/server/schemas/events.py @@ -0,0 +1,30 @@ +import uuid +from datetime import datetime +from typing import Optional +from uuid import UUID + +from pydantic import Field + +from dstack._internal.core.models.common import CoreModel +from dstack._internal.core.models.events import EventTargetType + + +class ListEventsRequest(CoreModel): + # TODO: docs + # TODO: restrict list length for filters? + # TODO: forbid contradicting filters? + target_projects: Optional[list[uuid.UUID]] = None + target_users: Optional[list[uuid.UUID]] = None + target_fleets: Optional[list[uuid.UUID]] = None + target_instances: Optional[list[uuid.UUID]] = None + target_runs: Optional[list[uuid.UUID]] = None + target_jobs: Optional[list[uuid.UUID]] = None + within_projects: Optional[list[uuid.UUID]] = None + within_fleets: Optional[list[uuid.UUID]] = None + within_runs: Optional[list[uuid.UUID]] = None + include_target_types: Optional[list[EventTargetType]] = None + actors: Optional[list[Optional[uuid.UUID]]] = None + prev_recorded_at: Optional[datetime] = None + prev_id: Optional[UUID] = None + limit: int = Field(100, ge=1, le=100) + ascending: bool = False diff --git a/src/dstack/_internal/server/services/events.py b/src/dstack/_internal/server/services/events.py new file mode 100644 index 0000000000..8d29d4c8a0 --- /dev/null +++ b/src/dstack/_internal/server/services/events.py @@ -0,0 +1,327 @@ +import uuid +from collections.abc import Iterable +from dataclasses import dataclass +from datetime import datetime +from typing import Optional, Union + +from sqlalchemy import and_, or_, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import joinedload + +from dstack._internal.core.models.events import Event, EventTarget, EventTargetType +from dstack._internal.core.models.users import GlobalRole +from dstack._internal.server import settings +from dstack._internal.server.models import ( + EventModel, + EventTargetModel, + FleetModel, + InstanceModel, + JobModel, + MemberModel, + ProjectModel, + RunModel, + UserModel, +) +from dstack._internal.utils.common import get_current_datetime + + +class SystemActor: + pass + + +@dataclass +class UserActor: + user_id: uuid.UUID + + +AnyActor = Union[SystemActor, UserActor] + + +@dataclass( + frozen=True, # to enforce the __post_init__ invariant +) +class Target: + """ + Target specification for event emission. + + **NOTE**: Prefer using `Target.from_model` to create `Target` instances, + unless you don't have a complete model available. + """ + + type: EventTargetType + project_id: Optional[uuid.UUID] + id: uuid.UUID + name: str + + def __post_init__(self): + if self.type == EventTargetType.USER and self.project_id is not None: + raise ValueError("User target cannot have project_id") + if self.type != EventTargetType.USER and self.project_id is None: + raise ValueError(f"{self.type} target must have project_id") + if self.type == EventTargetType.PROJECT and self.id != self.project_id: + raise ValueError("Project target id must be equal to project_id") + + @staticmethod + def from_model( + model: Union[ + FleetModel, + InstanceModel, + JobModel, + ProjectModel, + RunModel, + UserModel, + ], + ) -> "Target": + if isinstance(model, FleetModel): + return Target( + type=EventTargetType.FLEET, + project_id=model.project_id or model.project.id, + id=model.id, + name=model.name, + ) + if isinstance(model, InstanceModel): + return Target( + type=EventTargetType.INSTANCE, + project_id=model.project_id or model.project.id, + id=model.id, + name=model.name, + ) + if isinstance(model, JobModel): + return Target( + type=EventTargetType.JOB, + project_id=model.project_id or model.project.id, + id=model.id, + name=model.job_name, + ) + if isinstance(model, ProjectModel): + return Target( + type=EventTargetType.PROJECT, + project_id=model.id, + id=model.id, + name=model.name, + ) + if isinstance(model, RunModel): + return Target( + type=EventTargetType.RUN, + project_id=model.project_id or model.project.id, + id=model.id, + name=model.run_name, + ) + if isinstance(model, UserModel): + return Target( + type=EventTargetType.USER, + project_id=None, + id=model.id, + name=model.name, + ) + raise ValueError(f"Unsupported model type: {type(model)}") + + +def emit(session: AsyncSession, message: str, actor: AnyActor, targets: Iterable[Target]) -> None: + # TODO: docstring + best practices + # TODO: log each event + if settings.SERVER_EVENTS_TTL_SECONDS <= 0: + return + event = EventModel( + message=message, + actor_user_id=actor.user_id if isinstance(actor, UserActor) else None, + recorded_at=get_current_datetime(), + targets=[], + ) + for target in targets: + event.targets.append( + EventTargetModel( + entity_type=target.type, + entity_project_id=target.project_id, + entity_id=target.id, + entity_name=target.name, + ) + ) + if not event.targets: + raise ValueError("At least one target must be specified for an event") + session.add(event) + + +async def list_events( + session: AsyncSession, + user: UserModel, # the user requesting the events + target_projects: Optional[list[uuid.UUID]], + target_users: Optional[list[uuid.UUID]], + target_fleets: Optional[list[uuid.UUID]], + target_instances: Optional[list[uuid.UUID]], + target_runs: Optional[list[uuid.UUID]], + target_jobs: Optional[list[uuid.UUID]], + within_projects: Optional[list[uuid.UUID]], + within_fleets: Optional[list[uuid.UUID]], + within_runs: Optional[list[uuid.UUID]], + include_target_types: Optional[list[EventTargetType]], + actors: Optional[list[Optional[uuid.UUID]]], + prev_recorded_at: Optional[datetime], + prev_id: Optional[uuid.UUID], + limit: int, + ascending: bool, +) -> list[Event]: + filters = [] + if user.global_role != GlobalRole.ADMIN: + filters.append( + or_( + EventTargetModel.entity_project_id.in_( + select(MemberModel.project_id).where(MemberModel.user_id == user.id) + ), + and_( + EventTargetModel.entity_project_id.is_(None), + EventTargetModel.entity_type == EventTargetType.USER, + EventTargetModel.entity_id == user.id, + ), + ) + ) + if target_projects is not None: + filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.PROJECT, + EventTargetModel.entity_id.in_(target_projects), + ) + ) + if target_users is not None: + filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.USER, + EventTargetModel.entity_id.in_(target_users), + ) + ) + if target_fleets is not None: + filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.FLEET, + EventTargetModel.entity_id.in_(target_fleets), + ) + ) + if target_instances is not None: + filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.INSTANCE, + EventTargetModel.entity_id.in_(target_instances), + ) + ) + if target_runs is not None: + filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.RUN, + EventTargetModel.entity_id.in_(target_runs), + ) + ) + if target_jobs is not None: + filters.append( + and_( + EventTargetModel.entity_type == EventTargetType.JOB, + EventTargetModel.entity_id.in_(target_jobs), + ) + ) + if within_projects is not None: + filters.append(EventTargetModel.entity_project_id.in_(within_projects)) + if within_fleets is not None: + filters.append( + or_( + and_( + EventTargetModel.entity_type == EventTargetType.FLEET, + EventTargetModel.entity_id.in_(within_fleets), + ), + and_( + EventTargetModel.entity_type == EventTargetType.INSTANCE, + EventTargetModel.entity_id.in_( + select(InstanceModel.id).where(InstanceModel.fleet_id.in_(within_fleets)) + ), + ), + ) + ) + if within_runs is not None: + filters.append( + or_( + and_( + EventTargetModel.entity_type == EventTargetType.RUN, + EventTargetModel.entity_id.in_(within_runs), + ), + and_( + EventTargetModel.entity_type == EventTargetType.JOB, + EventTargetModel.entity_id.in_( + select(JobModel.id).where(JobModel.run_id.in_(within_runs)) + ), + ), + ) + ) + if include_target_types is not None: + filters.append(EventTargetModel.entity_type.in_(include_target_types)) + if actors is not None: + filters.append( + or_( + EventModel.actor_user_id.is_(None) if None in actors else False, + EventModel.actor_user_id.in_( + [actor_id for actor_id in actors if actor_id is not None] + ), + ) + ) + if prev_recorded_at is not None: + if ascending: + if prev_id is None: + filters.append(EventModel.recorded_at > prev_recorded_at) + else: + filters.append( + or_( + EventModel.recorded_at > prev_recorded_at, + and_(EventModel.recorded_at == prev_recorded_at, EventModel.id < prev_id), + ) + ) + else: + if prev_id is None: + filters.append(EventModel.recorded_at < prev_recorded_at) + else: + filters.append( + or_( + EventModel.recorded_at < prev_recorded_at, + and_(EventModel.recorded_at == prev_recorded_at, EventModel.id > prev_id), + ) + ) + order_by = (EventModel.recorded_at.desc(), EventModel.id) + if ascending: + order_by = (EventModel.recorded_at.asc(), EventModel.id.desc()) + query = ( + select(EventModel) + .order_by(*order_by) + .limit(limit) + .options( + joinedload(EventModel.targets), + joinedload(EventModel.user).load_only(UserModel.name), + ) + ) + if filters: + # Apply filters in a subquery, since it requires joining events with targets. + # Can't join in the outer query, as it results in LIMIT being applied to targets + # instead of events. + event_ids_subquery = ( + select(EventModel.id).join(EventModel.targets).where(*filters).distinct() + ) + query = query.where(EventModel.id.in_(event_ids_subquery)) + res = await session.execute(query) + event_models = res.unique().scalars().all() + return list(map(event_model_to_event, event_models)) + + +def event_model_to_event(event_model: EventModel) -> Event: + targets = [ + EventTarget( + type=target.entity_type, + project_id=target.entity_project_id, + id=target.entity_id, + name=target.entity_name, + ) + for target in event_model.targets + ] + + return Event( + id=event_model.id, + message=event_model.message, + recorded_at=event_model.recorded_at, + actor_user_id=event_model.actor_user_id, + actor_user=event_model.user.name if event_model.user else None, + targets=targets, + ) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 3a5329f6e4..110288c425 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -53,6 +53,7 @@ ProjectModel, UserModel, ) +from dstack._internal.server.services import events from dstack._internal.server.services import instances as instances_services from dstack._internal.server.services import offers as offers_services from dstack._internal.server.services.instances import ( @@ -752,6 +753,12 @@ async def _create_fleet( instances=[], ) session.add(fleet_model) + events.emit( + session, + f"Fleet created. Status: {fleet_model.status.upper()}", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(fleet_model)], + ) if spec.configuration.ssh_config is not None: for i, host in enumerate(spec.configuration.ssh_config.hosts): instances_model = await create_fleet_ssh_instance_model( @@ -773,6 +780,13 @@ async def _create_fleet( instance_num=i, ) fleet_model.instances.append(instance_model) + for instance_model in fleet_model.instances: + events.emit( + session, + f"Instance created on fleet submission. Status: {instance_model.status.upper()}", + actor=events.SystemActor(), + targets=[events.Target.from_model(instance_model)], + ) await session.commit() return fleet_model_to_fleet(fleet_model) diff --git a/src/dstack/_internal/server/services/projects.py b/src/dstack/_internal/server/services/projects.py index 330fcceb49..fcb781f5ce 100644 --- a/src/dstack/_internal/server/services/projects.py +++ b/src/dstack/_internal/server/services/projects.py @@ -30,7 +30,7 @@ VolumeModel, ) from dstack._internal.server.schemas.projects import MemberSetting -from dstack._internal.server.services import users +from dstack._internal.server.services import events, users from dstack._internal.server.services.backends import ( get_backend_config_without_creds_from_backend_model, ) @@ -524,6 +524,12 @@ async def create_project_model( is_public=is_public, ) session.add(project) + events.emit( + session, + "Project created", + actor=events.UserActor(owner.id), + targets=[events.Target.from_model(project)], + ) await session.commit() return project diff --git a/src/dstack/_internal/server/services/runs/__init__.py b/src/dstack/_internal/server/services/runs/__init__.py index cacda8e180..6bf8e8dfef 100644 --- a/src/dstack/_internal/server/services/runs/__init__.py +++ b/src/dstack/_internal/server/services/runs/__init__.py @@ -47,8 +47,8 @@ RunModel, UserModel, ) +from dstack._internal.server.services import events, services from dstack._internal.server.services import repos as repos_services -from dstack._internal.server.services import services from dstack._internal.server.services.jobs import ( check_can_attach_job_volumes, delay_job_instance_termination, @@ -484,6 +484,12 @@ async def submit_run( next_triggered_at=_get_next_triggered_at(run_spec), ) session.add(run_model) + events.emit( + session, + f"Run submitted. Status: {run_model.status.upper()}", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(run_model)], + ) if run_spec.configuration.type == "service": await services.register_service(session, run_model, run_spec) @@ -501,6 +507,14 @@ async def submit_run( status=JobStatus.SUBMITTED, ) session.add(job_model) + events.emit( + session, + f"Job created on run submission. Status: {job_model.status.upper()}", + actor=events.SystemActor(), + targets=[ + events.Target.from_model(job_model), + ], + ) await session.commit() await session.refresh(run_model) diff --git a/src/dstack/_internal/server/services/users.py b/src/dstack/_internal/server/services/users.py index 9fdbe3b4e8..2012f99986 100644 --- a/src/dstack/_internal/server/services/users.py +++ b/src/dstack/_internal/server/services/users.py @@ -18,6 +18,7 @@ UserWithCreds, ) from dstack._internal.server.models import DecryptedString, UserModel +from dstack._internal.server.services import events from dstack._internal.server.services.permissions import get_default_permissions from dstack._internal.server.utils.routers import error_forbidden from dstack._internal.utils import crypto @@ -81,6 +82,7 @@ async def create_user( active: bool = True, token: Optional[str] = None, config: Optional[UserHookConfig] = None, + creator: Optional[UserModel] = None, ) -> UserModel: validate_username(username) user_model = await get_user_model_by_name(session=session, username=username, ignore_case=True) @@ -101,6 +103,12 @@ async def create_user( ssh_public_key=public_bytes.decode(), ) session.add(user) + events.emit( + session, + "User created", + actor=events.UserActor(creator.id) if creator else events.UserActor(user.id), + targets=[events.Target.from_model(user)], + ) await session.commit() for func in _CREATE_USER_HOOKS: await func(session, user, config) diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index 0730db7f4a..f0bdc96017 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -106,6 +106,9 @@ os.getenv("DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS", 60) ) +# TODO: docs +SERVER_EVENTS_TTL_SECONDS = int(os.getenv("DSTACK_SERVER_EVENTS_TTL_SECONDS", 2 * 30 * 24 * 3600)) + SERVER_KEEP_SHIM_TASKS = os.getenv("DSTACK_SERVER_KEEP_SHIM_TASKS") is not None DEFAULT_PROJECT_NAME = "main" diff --git a/src/tests/_internal/server/background/tasks/test_process_events.py b/src/tests/_internal/server/background/tasks/test_process_events.py new file mode 100644 index 0000000000..5e344131ef --- /dev/null +++ b/src/tests/_internal/server/background/tasks/test_process_events.py @@ -0,0 +1,49 @@ +from datetime import datetime +from unittest.mock import patch + +import pytest +from freezegun import freeze_time +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.server import settings +from dstack._internal.server.background.tasks.process_events import delete_events +from dstack._internal.server.models import EventModel +from dstack._internal.server.services import events +from dstack._internal.server.testing.common import create_user + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +async def test_deletes_old_events(test_db, session: AsyncSession) -> None: + user = await create_user(session=session) + for i in range(10): + with freeze_time(datetime(2026, 1, 1, i)): + events.emit( + session, + message=f"Event {i}", + actor=events.UserActor(user_id=user.id), + targets=[events.Target.from_model(user)], + ) + await session.commit() + + res = await session.execute(select(EventModel)) + all_events = res.scalars().all() + assert len(all_events) == 10 + + with ( + patch.multiple(settings, SERVER_EVENTS_TTL_SECONDS=5 * 3600), + freeze_time(datetime(2026, 1, 1, 10)), + ): + await delete_events() + + res = await session.execute(select(EventModel).order_by(EventModel.recorded_at)) + remaining_events = res.scalars().all() + assert len(remaining_events) == 5 + assert [e.message for e in remaining_events] == [ + "Event 5", + "Event 6", + "Event 7", + "Event 8", + "Event 9", + ] diff --git a/src/tests/_internal/server/routers/test_events.py b/src/tests/_internal/server/routers/test_events.py new file mode 100644 index 0000000000..5add5fd632 --- /dev/null +++ b/src/tests/_internal/server/routers/test_events.py @@ -0,0 +1,1135 @@ +from datetime import datetime + +import pytest +from freezegun import freeze_time +from httpx import AsyncClient +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.core.models.users import GlobalRole, ProjectRole +from dstack._internal.server.services import events +from dstack._internal.server.services.projects import add_project_member +from dstack._internal.server.testing.common import ( + create_fleet, + create_instance, + create_job, + create_project, + create_repo, + create_run, + create_user, + get_auth_headers, +) + +pytestmark = [ + pytest.mark.asyncio, + pytest.mark.usefixtures("test_db", "image_config_mock"), + pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True), +] + + +class TestListEventsAccessControl: + async def test_user_sees_events_about_themselves( + self, session: AsyncSession, client: AsyncClient + ) -> None: + admin_user = await create_user( + session=session, + name="admin", + global_role=GlobalRole.ADMIN, + ) + regular_user = await create_user( + session=session, + name="regular", + global_role=GlobalRole.USER, + ) + events.emit( + session, + "User created", + actor=events.UserActor(admin_user.id), + targets=[events.Target.from_model(admin_user)], + ) + events.emit( + session, + "User created", + actor=events.UserActor(admin_user.id), + targets=[events.Target.from_model(regular_user)], + ) + await session.commit() + + # Regular user only sees the event about themselves + resp = await client.post( + "/api/events/list", headers=get_auth_headers(regular_user.token), json={} + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(regular_user.id) + + # Admin sees all events + resp = await client.post( + "/api/events/list", headers=get_auth_headers(admin_user.token), json={} + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_user_sees_events_within_their_project( + self, session: AsyncSession, client: AsyncClient + ) -> None: + admin_user = await create_user( + session=session, + name="admin", + global_role=GlobalRole.ADMIN, + ) + regular_user = await create_user( + session=session, + name="regular", + global_role=GlobalRole.USER, + ) + admin_project = await create_project( + session=session, + name="admin", + owner=admin_user, + ) + regular_project = await create_project( + session=session, + name="regular", + owner=regular_user, + ) + await add_project_member( + session=session, + project=admin_project, + user=admin_user, + project_role=ProjectRole.ADMIN, + ) + await add_project_member( + session=session, + project=regular_project, + user=regular_user, + project_role=ProjectRole.USER, + ) + admin_fleet = await create_fleet( + session=session, + project=admin_project, + name="admin", + ) + regular_fleet = await create_fleet( + session=session, + project=regular_project, + name="regular", + ) + events.emit( + session, + "Project created", + actor=events.UserActor(admin_user.id), + targets=[events.Target.from_model(admin_project)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor(admin_user.id), + targets=[events.Target.from_model(regular_project)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(admin_user.id), + targets=[events.Target.from_model(admin_fleet)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(admin_user.id), + targets=[events.Target.from_model(regular_fleet)], + ) + await session.commit() + + # Regular user only sees the events within their project + resp = await client.post( + "/api/events/list", headers=get_auth_headers(regular_user.token), json={} + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + assert {resp.json()[0]["targets"][0]["id"], resp.json()[1]["targets"][0]["id"]} == { + str(regular_project.id), + str(regular_fleet.id), + } + + # Admin sees all events + resp = await client.post( + "/api/events/list", headers=get_auth_headers(admin_user.token), json={} + ) + resp.raise_for_status() + assert len(resp.json()) == 4 + + async def test_filters_do_not_bypass_access_control( + self, session: AsyncSession, client: AsyncClient + ) -> None: + admin = await create_user( + session=session, + name="admin", + global_role=GlobalRole.ADMIN, + ) + user = await create_user(session=session, global_role=GlobalRole.USER) + project = await create_project(session=session) + fleet = await create_fleet(session=session, project=project) + events.emit( + session, + "Project created", + actor=events.UserActor(admin.id), + targets=[events.Target.from_model(project)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(admin.id), + targets=[events.Target.from_model(fleet)], + ) + await session.commit() + + # Regular user can't see events from a project they are not a member of + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_projects": [str(project.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 0 + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_projects": [str(project.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 0 + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_fleets": [str(fleet.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 0 + + # Admin can see the events + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(admin.token), + json={"within_projects": [str(project.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(admin.token), + json={"target_projects": [str(project.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(admin.token), + json={"target_fleets": [str(fleet.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + + +class TestListEventsFilters: + async def test_target_projects(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project_a = await create_project(session=session, name="project_a", owner=user) + project_b = await create_project(session=session, name="project_b", owner=user) + fleet_a = await create_fleet(session=session, project=project_a) + events.emit( + session, + "User created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(user)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(project_a)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(project_b)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(fleet_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_projects": [str(project_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(project_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_projects": [str(project_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(project_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_projects": [str(project_a.id), str(project_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_target_users(self, session: AsyncSession, client: AsyncClient) -> None: + user_a = await create_user(session=session, name="user_a") + user_b = await create_user(session=session, name="user_b") + project_a = await create_project(session=session, name="project_a", owner=user_a) + events.emit( + session, + "User created", + actor=events.UserActor(user_a.id), + targets=[events.Target.from_model(user_a)], + ) + events.emit( + session, + "User created", + actor=events.UserActor(user_b.id), + targets=[events.Target.from_model(user_b)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor(user_a.id), + targets=[events.Target.from_model(project_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"target_users": [str(user_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(user_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_b.token), + json={"target_users": [str(user_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(user_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"target_users": [str(user_a.id), str(user_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_target_fleets(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet_a = await create_fleet( + session=session, + project=project, + name="fleet_a", + ) + fleet_b = await create_fleet( + session=session, + project=project, + name="fleet_b", + ) + instance_a = await create_instance( + session=session, + project=project, + fleet=fleet_a, + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(fleet_a)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(fleet_b)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(instance_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_fleets": [str(fleet_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(fleet_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_fleets": [str(fleet_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(fleet_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_fleets": [str(fleet_a.id), str(fleet_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_target_instances(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet = await create_fleet(session=session, project=project) + instance_a = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + instance_b = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(fleet)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(instance_a)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(instance_b)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": [str(instance_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(instance_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": [str(instance_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(instance_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": [str(instance_a.id), str(instance_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_target_runs(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo(session=session, project_id=project.id) + run_a = await create_run( + session=session, + project=project, + run_name="run_a", + repo=repo, + user=user, + ) + run_b = await create_run( + session=session, + project=project, + run_name="run_b", + repo=repo, + user=user, + ) + job_a = await create_job( + session=session, + run=run_a, + ) + events.emit( + session, + "Run created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(run_a)], + ) + events.emit( + session, + "Run created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(run_b)], + ) + events.emit( + session, + "Job created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(job_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_runs": [str(run_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(run_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_runs": [str(run_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(run_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_runs": [str(run_a.id), str(run_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_target_jobs(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo(session=session, project_id=project.id) + run = await create_run( + session=session, + project=project, + run_name="run", + repo=repo, + user=user, + ) + job_a = await create_job( + session=session, + run=run, + ) + job_b = await create_job( + session=session, + run=run, + ) + events.emit( + session, + "Run created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(run)], + ) + events.emit( + session, + "Job created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(job_a)], + ) + events.emit( + session, + "Job created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(job_b)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_jobs": [str(job_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(job_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_jobs": [str(job_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(job_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_jobs": [str(job_a.id), str(job_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + async def test_within_projects(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project_a = await create_project(session=session, name="project_a", owner=user) + project_b = await create_project(session=session, name="project_b", owner=user) + fleet_a = await create_fleet(session=session, project=project_a) + instance_a = await create_instance( + session=session, + project=project_a, + fleet=fleet_a, + ) + events.emit( + session, + "User created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(user)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(project_a)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(project_b)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(fleet_a)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(instance_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_projects": [str(project_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 3 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_projects": [str(project_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_projects": [str(project_a.id), str(project_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 4 + + async def test_within_fleets(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet_a = await create_fleet( + session=session, + project=project, + name="fleet_a", + ) + fleet_b = await create_fleet( + session=session, + project=project, + name="fleet_b", + ) + isinstance_a = await create_instance( + session=session, + project=project, + fleet=fleet_a, + ) + events.emit( + session, + "Project created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(project)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(fleet_a)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(fleet_b)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(isinstance_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_fleets": [str(fleet_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_fleets": [str(fleet_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_fleets": [str(fleet_a.id), str(fleet_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 3 + + async def test_within_runs(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo(session=session, project_id=project.id) + run_a = await create_run( + session=session, + project=project, + run_name="run_a", + repo=repo, + user=user, + ) + run_b = await create_run( + session=session, + project=project, + run_name="run_b", + repo=repo, + user=user, + ) + job_a = await create_job( + session=session, + run=run_a, + ) + events.emit( + session, + "Project created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(project)], + ) + events.emit( + session, + "Run created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(run_a)], + ) + events.emit( + session, + "Run created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(run_b)], + ) + events.emit( + session, + "Job created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(job_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_runs": [str(run_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_runs": [str(run_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"within_runs": [str(run_a.id), str(run_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 3 + + async def test_include_target_types(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet = await create_fleet(session=session, project=project) + instance = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + events.emit( + session, + "Project created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(project)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(fleet)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(instance)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"include_target_types": ["fleet"]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["type"] == "fleet" + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"include_target_types": ["instance"]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["type"] == "instance" + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"include_target_types": ["project", "fleet"]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + assert {resp.json()[0]["targets"][0]["type"], resp.json()[1]["targets"][0]["type"]} == { + "project", + "fleet", + } + + async def test_actors(self, session: AsyncSession, client: AsyncClient) -> None: + user_a = await create_user(session=session, name="user_a") + user_b = await create_user(session=session, name="user_b") + project_a = await create_project(session=session, owner=user_a, name="project_a") + project_b = await create_project(session=session, owner=user_b, name="project_b") + events.emit( + session, + "Project created", + actor=events.UserActor(user_a.id), + targets=[events.Target.from_model(project_a)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor(user_b.id), + targets=[events.Target.from_model(project_b)], + ) + events.emit( + session, + "Project updated", + actor=events.SystemActor(), + targets=[events.Target.from_model(project_a)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"actors": [str(user_a.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Project created" + assert resp.json()[0]["targets"][0]["id"] == str(project_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"actors": [str(user_b.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Project created" + assert resp.json()[0]["targets"][0]["id"] == str(project_b.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"actors": [None]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Project updated" + assert resp.json()[0]["targets"][0]["id"] == str(project_a.id) + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user_a.token), + json={"actors": [str(user_a.id), None]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + assert {resp.json()[0]["targets"][0]["id"], resp.json()[1]["targets"][0]["id"]} == { + str(project_a.id) + } + + async def test_event_included_if_at_least_one_target_is_within_filters( + self, session: AsyncSession, client: AsyncClient + ) -> None: + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + fleet = await create_fleet(session=session, project=project) + instance_a = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + instance_b = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + events.emit( + session, + "Fleet instances created", + actor=events.UserActor(user.id), + targets=[ + events.Target.from_model(instance_a), + events.Target.from_model(instance_b), + ], + ) + instance_c = await create_instance( + session=session, + project=project, + fleet=fleet, + ) + events.emit( + session, + "Instance created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(instance_c)], + ) + await session.commit() + + for target_instances in [[instance_a.id], [instance_b.id], [instance_a.id, instance_b.id]]: + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": list(map(str, target_instances))}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Fleet instances created" + assert len(resp.json()[0]["targets"]) == 2 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": [str(instance_c.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Instance created" + assert len(resp.json()[0]["targets"]) == 1 + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={"target_instances": [str(instance_a.id), str(instance_c.id)]}, + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + + +class TestListEventsPagination: + @pytest.mark.parametrize("ascending", [True, False]) + async def test_pagination( + self, session: AsyncSession, client: AsyncClient, ascending: bool + ) -> None: + users = [] + for i in range(5): + user = await create_user(session=session, name=f"user_{i}") + users.append(user) + with freeze_time(datetime(2026, 1, 1, 12, 0, 0, i)): + events.emit( + session, + "User created", + actor=events.UserActor(user.id), + targets=[events.Target.from_model(user)], + ) + await session.commit() + + if not ascending: + users.reverse() + + resp = await client.post( + "/api/events/list", + json={ + "limit": 2, + "ascending": ascending, + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + assert resp.json()[0]["targets"][0]["name"] == users[0].name + assert resp.json()[1]["targets"][0]["name"] == users[1].name + + resp = await client.post( + "/api/events/list", + json={ + "limit": 2, + "ascending": ascending, + "prev_id": resp.json()[-1]["id"], + "prev_recorded_at": resp.json()[-1]["recorded_at"], + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 2 + assert resp.json()[0]["targets"][0]["name"] == users[2].name + assert resp.json()[1]["targets"][0]["name"] == users[3].name + + resp = await client.post( + "/api/events/list", + json={ + "limit": 2, + "ascending": ascending, + "prev_id": resp.json()[-1]["id"], + "prev_recorded_at": resp.json()[-1]["recorded_at"], + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["name"] == users[4].name + + resp = await client.post( + "/api/events/list", + json={ + "limit": 2, + "ascending": ascending, + "prev_id": resp.json()[-1]["id"], + "prev_recorded_at": resp.json()[-1]["recorded_at"], + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 0 + + async def test_limits_events_regardless_number_of_targets( + self, session: AsyncSession, client: AsyncClient + ) -> None: + users = [await create_user(session=session, name=f"user_{i}") for i in range(3)] + with freeze_time(datetime(2026, 1, 1, 12, 0, 0, 0)): + events.emit( + session, + "Users batch created", + actor=events.SystemActor(), + targets=[events.Target.from_model(users[0]), events.Target.from_model(users[1])], + ) + with freeze_time(datetime(2026, 1, 1, 12, 0, 0, 1)): + events.emit( + session, + "User created", + actor=events.SystemActor(), + targets=[events.Target.from_model(users[2])], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + json={ + "limit": 1, + "ascending": True, + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "Users batch created" + assert len(resp.json()[0]["targets"]) == 2 + assert {resp.json()[0]["targets"][0]["id"], resp.json()[0]["targets"][1]["id"]} == { + str(users[0].id), + str(users[1].id), + } + + resp = await client.post( + "/api/events/list", + json={ + "limit": 1, + "ascending": True, + "prev_id": resp.json()[-1]["id"], + "prev_recorded_at": resp.json()[-1]["recorded_at"], + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["message"] == "User created" + assert len(resp.json()[0]["targets"]) == 1 + assert resp.json()[0]["targets"][0]["id"] == str(users[2].id) + + resp = await client.post( + "/api/events/list", + json={ + "limit": 2, + "ascending": True, + }, + headers=get_auth_headers(users[0].token), + ) + resp.raise_for_status() + assert len(resp.json()) == 2 From 2fadf4a4a5b3e7cf7544e83f0c2dbbad1e0e7fcf Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Fri, 5 Dec 2025 01:50:23 +0100 Subject: [PATCH 2/3] Misc. updates - DB query and schema optimizations - Event logging - `DSTACK_FF_EVENTS` feature flag - More input validation - More unit tests - Various docs --- docs/docs/reference/environment-variables.md | 6 + src/dstack/_internal/core/models/events.py | 54 +++- src/dstack/_internal/server/app.py | 4 +- .../_internal/server/background/__init__.py | 4 +- ...74df9897e_add_events_and_event_targets.py} | 18 +- .../5fd659afca82_add_ix_instances_fleet_id.py | 31 +++ .../d4d9dc26cf58_add_ix_jobs_run_id.py | 31 +++ src/dstack/_internal/server/models.py | 18 +- src/dstack/_internal/server/routers/events.py | 7 + src/dstack/_internal/server/schemas/events.py | 179 ++++++++++-- .../_internal/server/services/events.py | 170 +++++++++--- .../_internal/server/services/fleets.py | 2 +- .../_internal/server/services/logging.py | 15 +- .../_internal/server/services/projects.py | 2 +- .../server/services/runs/__init__.py | 2 +- src/dstack/_internal/server/services/users.py | 2 +- src/dstack/_internal/server/settings.py | 6 +- src/dstack/_internal/settings.py | 3 + .../background/tasks/test_process_events.py | 9 +- .../_internal/server/routers/test_events.py | 255 ++++++++++++++---- 20 files changed, 673 insertions(+), 145 deletions(-) rename src/dstack/_internal/server/migrations/versions/{f27d4a29cd38_add_events_and_event_targets.py => 22d74df9897e_add_events_and_event_targets.py} (86%) create mode 100644 src/dstack/_internal/server/migrations/versions/5fd659afca82_add_ix_instances_fleet_id.py create mode 100644 src/dstack/_internal/server/migrations/versions/d4d9dc26cf58_add_ix_jobs_run_id.py diff --git a/docs/docs/reference/environment-variables.md b/docs/docs/reference/environment-variables.md index 31a94e903a..04ee67d93c 100644 --- a/docs/docs/reference/environment-variables.md +++ b/docs/docs/reference/environment-variables.md @@ -132,6 +132,12 @@ For more details on the options below, refer to the [server deployment](../guide - `DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_TTL_SECONDS } – Maximum age of instance health checks. - `DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS`{ #DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS } – Minimum time interval between consecutive health checks of the same instance. + + ??? info "Internal environment variables" The following environment variables are intended for development purposes: diff --git a/src/dstack/_internal/core/models/events.py b/src/dstack/_internal/core/models/events.py index fce57e87fb..f40854892f 100644 --- a/src/dstack/_internal/core/models/events.py +++ b/src/dstack/_internal/core/models/events.py @@ -1,9 +1,9 @@ -# TODO: docs - import uuid from datetime import datetime from enum import Enum -from typing import Optional +from typing import Annotated, Optional + +from pydantic import Field from dstack._internal.core.models.common import CoreModel @@ -18,16 +18,50 @@ class EventTargetType(str, Enum): class EventTarget(CoreModel): - type: str # Holds EventTargetType; str for adding new types without breaking compatibility - project_id: Optional[uuid.UUID] - id: uuid.UUID - name: str + type: Annotated[ + str, # not using EventTargetType to allow adding new types without breaking compatibility + Field( + description=( + f"Type of the target entity." + f" One of: {', '.join([f'`{t}`' for t in EventTargetType])}" + ) + ), + ] + project_id: Annotated[ + Optional[uuid.UUID], + Field( + description=( + "ID of the project the target entity belongs to," + " or `null` for target types not bound to a project (e.g., users)" + ) + ), + ] + id: Annotated[uuid.UUID, Field(description="ID of the target entity")] + name: Annotated[str, Field(description="Name of the target entity")] class Event(CoreModel): id: uuid.UUID message: str recorded_at: datetime - actor_user_id: Optional[uuid.UUID] - actor_user: Optional[str] - targets: list[EventTarget] + actor_user_id: Annotated[ + Optional[uuid.UUID], + Field( + description=( + "ID of the user who performed the action that triggered the event," + " or `null` if the action was performed by the system" + ) + ), + ] + actor_user: Annotated[ + Optional[str], + Field( + description=( + "Name of the user who performed the action that triggered the event," + " or `null` if the action was performed by the system" + ) + ), + ] + targets: Annotated[ + list[EventTarget], Field(description="List of entities affected by the event") + ] diff --git a/src/dstack/_internal/server/app.py b/src/dstack/_internal/server/app.py index 736733b403..5382e8f113 100644 --- a/src/dstack/_internal/server/app.py +++ b/src/dstack/_internal/server/app.py @@ -66,7 +66,7 @@ get_client_version, get_server_client_error_details, ) -from dstack._internal.settings import DSTACK_VERSION +from dstack._internal.settings import DSTACK_VERSION, FeatureFlags from dstack._internal.utils.logging import get_logger from dstack._internal.utils.ssh import check_required_ssh_version @@ -229,7 +229,7 @@ def register_routes(app: FastAPI, ui: bool = True): app.include_router(model_proxy.router, prefix="/proxy/models", tags=["model-proxy"]) app.include_router(prometheus.router) app.include_router(files.router) - app.include_router(events.root_router) + app.include_router(events.root_router, include_in_schema=FeatureFlags.EVENTS) @app.exception_handler(ForbiddenError) async def forbidden_error_handler(request: Request, exc: ForbiddenError): diff --git a/src/dstack/_internal/server/background/__init__.py b/src/dstack/_internal/server/background/__init__.py index 85af7d3315..2c1a42e857 100644 --- a/src/dstack/_internal/server/background/__init__.py +++ b/src/dstack/_internal/server/background/__init__.py @@ -33,6 +33,7 @@ process_terminating_jobs, ) from dstack._internal.server.background.tasks.process_volumes import process_submitted_volumes +from dstack._internal.settings import FeatureFlags _scheduler = AsyncIOScheduler() @@ -70,7 +71,8 @@ def start_background_tasks() -> AsyncIOScheduler: _scheduler.add_job(process_probes, IntervalTrigger(seconds=3, jitter=1)) _scheduler.add_job(collect_metrics, IntervalTrigger(seconds=10), max_instances=1) _scheduler.add_job(delete_metrics, IntervalTrigger(minutes=5), max_instances=1) - _scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1) + if FeatureFlags.EVENTS: + _scheduler.add_job(delete_events, IntervalTrigger(minutes=7), max_instances=1) if settings.ENABLE_PROMETHEUS_METRICS: _scheduler.add_job( collect_prometheus_metrics, IntervalTrigger(seconds=10), max_instances=1 diff --git a/src/dstack/_internal/server/migrations/versions/f27d4a29cd38_add_events_and_event_targets.py b/src/dstack/_internal/server/migrations/versions/22d74df9897e_add_events_and_event_targets.py similarity index 86% rename from src/dstack/_internal/server/migrations/versions/f27d4a29cd38_add_events_and_event_targets.py rename to src/dstack/_internal/server/migrations/versions/22d74df9897e_add_events_and_event_targets.py index 38d41ab4d6..87a48deba0 100644 --- a/src/dstack/_internal/server/migrations/versions/f27d4a29cd38_add_events_and_event_targets.py +++ b/src/dstack/_internal/server/migrations/versions/22d74df9897e_add_events_and_event_targets.py @@ -1,8 +1,8 @@ """Add events and event_targets -Revision ID: f27d4a29cd38 -Revises: 7d1ec2b920ac -Create Date: 2025-11-26 01:01:46.305815 +Revision ID: 22d74df9897e +Revises: 5fd659afca82 +Create Date: 2025-12-04 20:56:08.003504 """ @@ -13,8 +13,8 @@ import dstack._internal.server.models # revision identifiers, used by Alembic. -revision = "f27d4a29cd38" -down_revision = "7d1ec2b920ac" +revision = "22d74df9897e" +down_revision = "5fd659afca82" branch_labels = None depends_on = None @@ -68,9 +68,15 @@ def upgrade() -> None: sa.PrimaryKeyConstraint("id", name=op.f("pk_event_targets")), ) with op.batch_alter_table("event_targets", schema=None) as batch_op: + batch_op.create_index( + batch_op.f("ix_event_targets_entity_id"), ["entity_id"], unique=False + ) batch_op.create_index( batch_op.f("ix_event_targets_entity_project_id"), ["entity_project_id"], unique=False ) + batch_op.create_index( + batch_op.f("ix_event_targets_entity_type"), ["entity_type"], unique=False + ) batch_op.create_index(batch_op.f("ix_event_targets_event_id"), ["event_id"], unique=False) # ### end Alembic commands ### @@ -80,7 +86,9 @@ def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### with op.batch_alter_table("event_targets", schema=None) as batch_op: batch_op.drop_index(batch_op.f("ix_event_targets_event_id")) + batch_op.drop_index(batch_op.f("ix_event_targets_entity_type")) batch_op.drop_index(batch_op.f("ix_event_targets_entity_project_id")) + batch_op.drop_index(batch_op.f("ix_event_targets_entity_id")) op.drop_table("event_targets") with op.batch_alter_table("events", schema=None) as batch_op: diff --git a/src/dstack/_internal/server/migrations/versions/5fd659afca82_add_ix_instances_fleet_id.py b/src/dstack/_internal/server/migrations/versions/5fd659afca82_add_ix_instances_fleet_id.py new file mode 100644 index 0000000000..4e9467a7cf --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/5fd659afca82_add_ix_instances_fleet_id.py @@ -0,0 +1,31 @@ +"""Add ix_instances_fleet_id + +Revision ID: 5fd659afca82 +Revises: d4d9dc26cf58 +Create Date: 2025-12-04 20:52:07.015334 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "5fd659afca82" +down_revision = "d4d9dc26cf58" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_instances_fleet_id"), ["fleet_id"], unique=False) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("instances", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_instances_fleet_id")) + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/migrations/versions/d4d9dc26cf58_add_ix_jobs_run_id.py b/src/dstack/_internal/server/migrations/versions/d4d9dc26cf58_add_ix_jobs_run_id.py new file mode 100644 index 0000000000..b3d485a075 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/d4d9dc26cf58_add_ix_jobs_run_id.py @@ -0,0 +1,31 @@ +"""Add ix_jobs_run_id + +Revision ID: d4d9dc26cf58 +Revises: 006512f572b4 +Create Date: 2025-12-04 20:48:10.543248 + +""" + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "d4d9dc26cf58" +down_revision = "006512f572b4" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.create_index(batch_op.f("ix_jobs_run_id"), ["run_id"], unique=False) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_jobs_run_id")) + + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index ca9d045d9e..9e6adc1c98 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -408,7 +408,9 @@ class JobModel(BaseModel): project_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE")) project: Mapped["ProjectModel"] = relationship() - run_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("runs.id", ondelete="CASCADE")) + run_id: Mapped[uuid.UUID] = mapped_column( + ForeignKey("runs.id", ondelete="CASCADE"), index=True + ) run: Mapped["RunModel"] = relationship() # Jobs need to reference fleets because we may choose an optimal fleet for a master job @@ -602,7 +604,7 @@ class InstanceModel(BaseModel): ) pool: Mapped[Optional["PoolModel"]] = relationship(back_populates="instances") - fleet_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("fleets.id")) + fleet_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("fleets.id"), index=True) fleet: Mapped[Optional["FleetModel"]] = relationship(back_populates="instances") compute_group_id: Mapped[Optional[uuid.UUID]] = mapped_column(ForeignKey("compute_groups.id")) @@ -858,16 +860,14 @@ class SecretModel(BaseModel): class EventModel(BaseModel): __tablename__ = "events" - id: Mapped[uuid.UUID] = mapped_column( - UUIDType(binary=False), primary_key=True, default=uuid.uuid4 - ) + id: Mapped[uuid.UUID] = mapped_column(UUIDType(binary=False), primary_key=True) message: Mapped[str] = mapped_column(Text) recorded_at: Mapped[datetime] = mapped_column(NaiveDateTime, index=True) actor_user_id: Mapped[Optional[uuid.UUID]] = mapped_column( ForeignKey("users.id", ondelete="CASCADE"), nullable=True, index=True ) - user: Mapped[Optional["UserModel"]] = relationship() + actor_user: Mapped[Optional["UserModel"]] = relationship() targets: Mapped[List["EventTargetModel"]] = relationship(back_populates="event") @@ -889,6 +889,8 @@ class EventTargetModel(BaseModel): ) entity_project: Mapped[Optional["ProjectModel"]] = relationship() - entity_type: Mapped[EventTargetType] = mapped_column(EnumAsString(EventTargetType, 100)) - entity_id: Mapped[uuid.UUID] = mapped_column(UUIDType(binary=False)) + entity_type: Mapped[EventTargetType] = mapped_column( + EnumAsString(EventTargetType, 100), index=True + ) + entity_id: Mapped[uuid.UUID] = mapped_column(UUIDType(binary=False), index=True) entity_name: Mapped[str] = mapped_column(String(200)) diff --git a/src/dstack/_internal/server/routers/events.py b/src/dstack/_internal/server/routers/events.py index e4a55e461b..b5472aa0d3 100644 --- a/src/dstack/_internal/server/routers/events.py +++ b/src/dstack/_internal/server/routers/events.py @@ -2,6 +2,7 @@ from sqlalchemy.ext.asyncio import AsyncSession import dstack._internal.server.services.events as events_services +from dstack._internal.core.errors import ServerClientError from dstack._internal.core.models.events import Event from dstack._internal.server.db import get_session from dstack._internal.server.models import UserModel @@ -11,6 +12,7 @@ CustomORJSONResponse, get_base_api_additional_responses, ) +from dstack._internal.settings import FeatureFlags root_router = APIRouter( prefix="/api/events", @@ -28,9 +30,14 @@ async def list_events( """ Returns events visible to the current user. + Regular users can see events related to themselves and to projects they are members of. + Global admins can see all events. + The results are paginated. To get the next page, pass `recorded_at` and `id` of the last event from the previous page as `prev_recorded_at` and `prev_id`. """ + if not FeatureFlags.EVENTS: + raise ServerClientError("Events are disabled on this server") return CustomORJSONResponse( await events_services.list_events( session=session, diff --git a/src/dstack/_internal/server/schemas/events.py b/src/dstack/_internal/server/schemas/events.py index fcd07dbba0..05ed5b323c 100644 --- a/src/dstack/_internal/server/schemas/events.py +++ b/src/dstack/_internal/server/schemas/events.py @@ -1,30 +1,177 @@ import uuid from datetime import datetime -from typing import Optional +from typing import Annotated, Optional from uuid import UUID -from pydantic import Field +from pydantic import Field, root_validator from dstack._internal.core.models.common import CoreModel from dstack._internal.core.models.events import EventTargetType +MIN_FILTER_ITEMS = 1 +MAX_FILTER_ITEMS = 16 # Conservative limit to prevent overly complex db queries + class ListEventsRequest(CoreModel): - # TODO: docs - # TODO: restrict list length for filters? - # TODO: forbid contradicting filters? - target_projects: Optional[list[uuid.UUID]] = None - target_users: Optional[list[uuid.UUID]] = None - target_fleets: Optional[list[uuid.UUID]] = None - target_instances: Optional[list[uuid.UUID]] = None - target_runs: Optional[list[uuid.UUID]] = None - target_jobs: Optional[list[uuid.UUID]] = None - within_projects: Optional[list[uuid.UUID]] = None - within_fleets: Optional[list[uuid.UUID]] = None - within_runs: Optional[list[uuid.UUID]] = None - include_target_types: Optional[list[EventTargetType]] = None - actors: Optional[list[Optional[uuid.UUID]]] = None + target_projects: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of project IDs." + " The response will only include events that target the specified projects" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + target_users: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of user IDs." + " The response will only include events that target the specified users" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + target_fleets: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of fleet IDs." + " The response will only include events that target the specified fleets" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + target_instances: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of instance IDs." + " The response will only include events that target the specified instances" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + target_runs: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of run IDs." + " The response will only include events that target the specified runs" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + target_jobs: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of job IDs." + " The response will only include events that target the specified jobs" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + within_projects: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of project IDs." + " The response will only include events that target the specified projects" + " or any entities within those projects" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + within_fleets: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of fleet IDs." + " The response will only include events that target the specified fleets" + " or instances within those fleets" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + within_runs: Annotated[ + Optional[list[uuid.UUID]], + Field( + description=( + "List of run IDs." + " The response will only include events that target the specified runs" + " or jobs within those runs" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + include_target_types: Annotated[ + Optional[list[EventTargetType]], + Field( + description=( + "List of target types." + " The response will only include events that have a target" + " of one of the specified types" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None + actors: Annotated[ + Optional[list[Optional[uuid.UUID]]], + Field( + description=( + "List of user IDs or `null` values." + " The response will only include events about actions" + " performed by the specified users," + " or performed by the system if `null` is specified" + ), + min_items=MIN_FILTER_ITEMS, + max_items=MAX_FILTER_ITEMS, + ), + ] = None prev_recorded_at: Optional[datetime] = None prev_id: Optional[UUID] = None limit: int = Field(100, ge=1, le=100) ascending: bool = False + + @root_validator + def _validate_target_filters(cls, values): + """ + Raise an error if more than one target_* filter is set. Setting multiple + target_* filters would always result in an empty response, which might confuse users. + """ + + target_filters = [name for name in cls.__fields__ if name.startswith("target_")] + set_filters = [f for f in target_filters if values.get(f) is not None] + if len(set_filters) > 1: + raise ValueError( + f"At most one target_* filter can be set at a time. Got {', '.join(set_filters)}" + ) + return values + + @root_validator + def _validate_within_filters(cls, values): + """ + Raise an error if more than one within_* filter is set. Setting multiple + within_* filters is either redundant or incorrect. Each within_* filter + may also lead to additional db queries, causing unnecessary load. + """ + + within_filters = [name for name in cls.__fields__ if name.startswith("within_")] + set_filters = [f for f in within_filters if values.get(f) is not None] + if len(set_filters) > 1: + raise ValueError( + f"At most one within_* filter can be set at a time. Got {', '.join(set_filters)}" + ) + return values diff --git a/src/dstack/_internal/server/services/events.py b/src/dstack/_internal/server/services/events.py index 8d29d4c8a0..34e7b72378 100644 --- a/src/dstack/_internal/server/services/events.py +++ b/src/dstack/_internal/server/services/events.py @@ -1,10 +1,9 @@ import uuid -from collections.abc import Iterable from dataclasses import dataclass from datetime import datetime from typing import Optional, Union -from sqlalchemy import and_, or_, select +from sqlalchemy import and_, exists, or_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload @@ -22,16 +21,39 @@ RunModel, UserModel, ) +from dstack._internal.server.services.logging import fmt_entity +from dstack._internal.settings import FeatureFlags from dstack._internal.utils.common import get_current_datetime +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) class SystemActor: - pass + """Represents the system as the actor of an event""" + + def fmt(self) -> str: + return "system" @dataclass class UserActor: + """ + Represents a user as the actor of an event. + + **NOTE**: Prefer using `UserActor.from_user` to create `UserActor` instances, + unless you don't have a complete `UserModel` available. + """ + user_id: uuid.UUID + user_name: str + + @staticmethod + def from_user(user: UserModel) -> "UserActor": + return UserActor(user_id=user.id, user_name=user.name) + + def fmt(self) -> str: + return fmt_entity("user", self.user_id, self.user_name) AnyActor = Union[SystemActor, UserActor] @@ -116,13 +138,63 @@ def from_model( ) raise ValueError(f"Unsupported model type: {type(model)}") + def fmt(self) -> str: + return fmt_entity(self.type, self.id, self.name) + + +def emit(session: AsyncSession, message: str, actor: AnyActor, targets: list[Target]) -> None: + """ + Emit an event - add it to the current session without committing. + + Usage guidelines: + - Message: + - Use past tense - events should describe completed actions. + Bad: "Creating project" + Good: "Project created" + - Do not duplicate target and actor names in the message. + Bad: "User John created project MyProject" + Good: "Project created" + - Actor: + - Pass `UserActor` for events about user actions, e.g., in API handlers. + - Pass `SystemActor` for system-generated events, e.g., in background jobs. + - Targets: + - Link the event to one or more entities affected by it. + E.g., for a "Job assigned to instance" event, link it to the job and the instance. + - Do not link the event to parent entities of the affected entities. + E.g., the "Instance created" event should be linked to the instance only, + not to the fleet or project. Transitive relationships with parent entities + are inferred automatically when listing events using the within_* filters. + - **Important**: If linking the event to multiple targets with different access scopes + (e.g., entities in different projects, or different users), ensure that this does not + leak sensitive information. If a user has access to at least one of the targets, + they will see the entire event with all targets. If this is not desired, + consider emitting multiple separate events instead. + """ + if not FeatureFlags.EVENTS: + return + + if not targets: + raise ValueError("At least one target must be specified") + if not message: + raise ValueError("Message cannot be empty") + if message.strip() != message: + raise ValueError("Message cannot have leading or trailing whitespace") + if "\n" in message: + raise ValueError("Message cannot contain newlines") + if message.endswith("."): + raise ValueError("Message cannot end with a period") + + logger.info( + "Emitting event: %s. Event targets: %s. Actor: %s", + message, + ", ".join(target.fmt() for target in targets), + actor.fmt(), + ) -def emit(session: AsyncSession, message: str, actor: AnyActor, targets: Iterable[Target]) -> None: - # TODO: docstring + best practices - # TODO: log each event if settings.SERVER_EVENTS_TTL_SECONDS <= 0: return event = EventModel( + id=uuid.uuid4(), message=message, actor_user_id=actor.user_id if isinstance(actor, UserActor) else None, recorded_at=get_current_datetime(), @@ -137,8 +209,6 @@ def emit(session: AsyncSession, message: str, actor: AnyActor, targets: Iterable entity_name=target.name, ) ) - if not event.targets: - raise ValueError("At least one target must be specified for an event") session.add(event) @@ -161,13 +231,16 @@ async def list_events( limit: int, ascending: bool, ) -> list[Event]: - filters = [] + target_filters = [] if user.global_role != GlobalRole.ADMIN: - filters.append( + query = select(MemberModel.project_id).where(MemberModel.user_id == user.id) + res = await session.execute(query) + # In Postgres, fetching project IDs separately is orders of magnitude faster + # than using a subquery. + project_ids = list(res.unique().scalars().all()) + target_filters.append( or_( - EventTargetModel.entity_project_id.in_( - select(MemberModel.project_id).where(MemberModel.user_id == user.id) - ), + EventTargetModel.entity_project_id.in_(project_ids), and_( EventTargetModel.entity_project_id.is_(None), EventTargetModel.entity_type == EventTargetType.USER, @@ -176,51 +249,56 @@ async def list_events( ) ) if target_projects is not None: - filters.append( + target_filters.append( and_( EventTargetModel.entity_type == EventTargetType.PROJECT, EventTargetModel.entity_id.in_(target_projects), ) ) if target_users is not None: - filters.append( + target_filters.append( and_( EventTargetModel.entity_type == EventTargetType.USER, EventTargetModel.entity_id.in_(target_users), ) ) if target_fleets is not None: - filters.append( + target_filters.append( and_( EventTargetModel.entity_type == EventTargetType.FLEET, EventTargetModel.entity_id.in_(target_fleets), ) ) if target_instances is not None: - filters.append( + target_filters.append( and_( EventTargetModel.entity_type == EventTargetType.INSTANCE, EventTargetModel.entity_id.in_(target_instances), ) ) if target_runs is not None: - filters.append( + target_filters.append( and_( EventTargetModel.entity_type == EventTargetType.RUN, EventTargetModel.entity_id.in_(target_runs), ) ) if target_jobs is not None: - filters.append( + target_filters.append( and_( EventTargetModel.entity_type == EventTargetType.JOB, EventTargetModel.entity_id.in_(target_jobs), ) ) if within_projects is not None: - filters.append(EventTargetModel.entity_project_id.in_(within_projects)) + target_filters.append(EventTargetModel.entity_project_id.in_(within_projects)) if within_fleets is not None: - filters.append( + query = select(InstanceModel.id).where(InstanceModel.fleet_id.in_(within_fleets)) + res = await session.execute(query) + # In Postgres, fetching instance IDs separately is orders of magnitude faster + # than using a subquery. + instance_ids = list(res.unique().scalars().all()) + target_filters.append( or_( and_( EventTargetModel.entity_type == EventTargetType.FLEET, @@ -228,14 +306,17 @@ async def list_events( ), and_( EventTargetModel.entity_type == EventTargetType.INSTANCE, - EventTargetModel.entity_id.in_( - select(InstanceModel.id).where(InstanceModel.fleet_id.in_(within_fleets)) - ), + EventTargetModel.entity_id.in_(instance_ids), ), ) ) if within_runs is not None: - filters.append( + query = select(JobModel.id).where(JobModel.run_id.in_(within_runs)) + res = await session.execute(query) + # In Postgres, fetching job IDs separately is orders of magnitude faster + # than using a subquery. + job_ids = list(res.unique().scalars().all()) + target_filters.append( or_( and_( EventTargetModel.entity_type == EventTargetType.RUN, @@ -243,16 +324,16 @@ async def list_events( ), and_( EventTargetModel.entity_type == EventTargetType.JOB, - EventTargetModel.entity_id.in_( - select(JobModel.id).where(JobModel.run_id.in_(within_runs)) - ), + EventTargetModel.entity_id.in_(job_ids), ), ) ) if include_target_types is not None: - filters.append(EventTargetModel.entity_type.in_(include_target_types)) + target_filters.append(EventTargetModel.entity_type.in_(include_target_types)) + + event_filters = [] if actors is not None: - filters.append( + event_filters.append( or_( EventModel.actor_user_id.is_(None) if None in actors else False, EventModel.actor_user_id.in_( @@ -263,9 +344,9 @@ async def list_events( if prev_recorded_at is not None: if ascending: if prev_id is None: - filters.append(EventModel.recorded_at > prev_recorded_at) + event_filters.append(EventModel.recorded_at > prev_recorded_at) else: - filters.append( + event_filters.append( or_( EventModel.recorded_at > prev_recorded_at, and_(EventModel.recorded_at == prev_recorded_at, EventModel.id < prev_id), @@ -273,9 +354,9 @@ async def list_events( ) else: if prev_id is None: - filters.append(EventModel.recorded_at < prev_recorded_at) + event_filters.append(EventModel.recorded_at < prev_recorded_at) else: - filters.append( + event_filters.append( or_( EventModel.recorded_at < prev_recorded_at, and_(EventModel.recorded_at == prev_recorded_at, EventModel.id > prev_id), @@ -290,17 +371,20 @@ async def list_events( .limit(limit) .options( joinedload(EventModel.targets), - joinedload(EventModel.user).load_only(UserModel.name), + joinedload(EventModel.actor_user).load_only(UserModel.name), ) ) - if filters: - # Apply filters in a subquery, since it requires joining events with targets. - # Can't join in the outer query, as it results in LIMIT being applied to targets - # instead of events. - event_ids_subquery = ( - select(EventModel.id).join(EventModel.targets).where(*filters).distinct() + if event_filters: + query = query.where(*event_filters) + if target_filters: + query = query.where( + exists().where( + and_( + EventTargetModel.event_id == EventModel.id, + *target_filters, + ) + ) ) - query = query.where(EventModel.id.in_(event_ids_subquery)) res = await session.execute(query) event_models = res.unique().scalars().all() return list(map(event_model_to_event, event_models)) @@ -322,6 +406,6 @@ def event_model_to_event(event_model: EventModel) -> Event: message=event_model.message, recorded_at=event_model.recorded_at, actor_user_id=event_model.actor_user_id, - actor_user=event_model.user.name if event_model.user else None, + actor_user=event_model.actor_user.name if event_model.actor_user else None, targets=targets, ) diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 110288c425..b1285134ec 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -756,7 +756,7 @@ async def _create_fleet( events.emit( session, f"Fleet created. Status: {fleet_model.status.upper()}", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(fleet_model)], ) if spec.configuration.ssh_config is not None: diff --git a/src/dstack/_internal/server/services/logging.py b/src/dstack/_internal/server/services/logging.py index c738534198..bf1b72f099 100644 --- a/src/dstack/_internal/server/services/logging.py +++ b/src/dstack/_internal/server/services/logging.py @@ -1,3 +1,4 @@ +import uuid from typing import Union from dstack._internal.server.models import ( @@ -12,13 +13,17 @@ def fmt(model: Union[RunModel, JobModel, InstanceModel, GatewayModel, ProbeModel]) -> str: """Consistent string representation of a model for logging.""" if isinstance(model, RunModel): - return f"run({model.id.hex[:6]}){model.run_name}" + return fmt_entity("run", model.id, model.run_name) if isinstance(model, JobModel): - return f"job({model.id.hex[:6]}){model.job_name}" + return fmt_entity("job", model.id, model.job_name) if isinstance(model, InstanceModel): - return f"instance({model.id.hex[:6]}){model.name}" + return fmt_entity("instance", model.id, model.name) if isinstance(model, GatewayModel): - return f"gateway({model.id.hex[:6]}){model.name}" + return fmt_entity("gateway", model.id, model.name) if isinstance(model, ProbeModel): - return f"probe({model.id.hex[:6]}){model.name}" + return fmt_entity("probe", model.id, model.name) return str(model) + + +def fmt_entity(entity_type: str, entity_id: uuid.UUID, entity_name: str) -> str: + return f"{entity_type}({entity_id.hex[:6]}){entity_name}" diff --git a/src/dstack/_internal/server/services/projects.py b/src/dstack/_internal/server/services/projects.py index d8e205c9ae..2004b5cccd 100644 --- a/src/dstack/_internal/server/services/projects.py +++ b/src/dstack/_internal/server/services/projects.py @@ -543,7 +543,7 @@ async def create_project_model( events.emit( session, "Project created", - actor=events.UserActor(owner.id), + actor=events.UserActor.from_user(owner), targets=[events.Target.from_model(project)], ) await session.commit() diff --git a/src/dstack/_internal/server/services/runs/__init__.py b/src/dstack/_internal/server/services/runs/__init__.py index 6bf8e8dfef..18c0847b41 100644 --- a/src/dstack/_internal/server/services/runs/__init__.py +++ b/src/dstack/_internal/server/services/runs/__init__.py @@ -487,7 +487,7 @@ async def submit_run( events.emit( session, f"Run submitted. Status: {run_model.status.upper()}", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(run_model)], ) diff --git a/src/dstack/_internal/server/services/users.py b/src/dstack/_internal/server/services/users.py index 01b90b0087..a42fb64a1a 100644 --- a/src/dstack/_internal/server/services/users.py +++ b/src/dstack/_internal/server/services/users.py @@ -112,7 +112,7 @@ async def create_user( events.emit( session, "User created", - actor=events.UserActor(creator.id) if creator else events.UserActor(user.id), + actor=events.UserActor.from_user(creator) if creator else events.UserActor.from_user(user), targets=[events.Target.from_model(user)], ) await session.commit() diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index f0bdc96017..52f56f23e6 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -106,8 +106,10 @@ os.getenv("DSTACK_SERVER_INSTANCE_HEALTH_MIN_COLLECT_INTERVAL_SECONDS", 60) ) -# TODO: docs -SERVER_EVENTS_TTL_SECONDS = int(os.getenv("DSTACK_SERVER_EVENTS_TTL_SECONDS", 2 * 30 * 24 * 3600)) +SERVER_EVENTS_TTL_SECONDS = int( + # default documented in reference/environment-variables.md, keep in sync + os.getenv("DSTACK_SERVER_EVENTS_TTL_SECONDS", 30 * 24 * 3600) +) SERVER_KEEP_SHIM_TASKS = os.getenv("DSTACK_SERVER_KEEP_SHIM_TASKS") is not None diff --git a/src/dstack/_internal/settings.py b/src/dstack/_internal/settings.py index 0462ddcdfb..86ca6fe31a 100644 --- a/src/dstack/_internal/settings.py +++ b/src/dstack/_internal/settings.py @@ -44,3 +44,6 @@ class FeatureFlags: # - Makes `repos[].path` required, unless the client is older than 0.19.27, # in which case `/workflow` is still used. LEGACY_REPO_DIR_DISABLED = os.getenv("DSTACK_FF_LEGACY_REPO_DIR_DISABLED") is not None + + # Server-side flag to enable event emission and Events API + EVENTS = os.getenv("DSTACK_FF_EVENTS") is not None diff --git a/src/tests/_internal/server/background/tasks/test_process_events.py b/src/tests/_internal/server/background/tasks/test_process_events.py index 5e344131ef..1dc29167da 100644 --- a/src/tests/_internal/server/background/tasks/test_process_events.py +++ b/src/tests/_internal/server/background/tasks/test_process_events.py @@ -1,4 +1,5 @@ from datetime import datetime +from typing import Generator from unittest.mock import patch import pytest @@ -13,6 +14,12 @@ from dstack._internal.server.testing.common import create_user +@pytest.fixture(autouse=True) +def set_feature_flag() -> Generator[None, None, None]: + with patch("dstack._internal.settings.FeatureFlags.EVENTS", True): + yield + + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) async def test_deletes_old_events(test_db, session: AsyncSession) -> None: @@ -22,7 +29,7 @@ async def test_deletes_old_events(test_db, session: AsyncSession) -> None: events.emit( session, message=f"Event {i}", - actor=events.UserActor(user_id=user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(user)], ) await session.commit() diff --git a/src/tests/_internal/server/routers/test_events.py b/src/tests/_internal/server/routers/test_events.py index 5add5fd632..149f28ee3a 100644 --- a/src/tests/_internal/server/routers/test_events.py +++ b/src/tests/_internal/server/routers/test_events.py @@ -1,4 +1,7 @@ +import uuid from datetime import datetime +from typing import Generator +from unittest.mock import patch import pytest from freezegun import freeze_time @@ -26,6 +29,93 @@ ] +@pytest.fixture(autouse=True) +def set_feature_flag() -> Generator[None, None, None]: + with patch("dstack._internal.settings.FeatureFlags.EVENTS", True): + yield + + +class TestListEventsGeneral: + async def test_response_format(self, session: AsyncSession, client: AsyncClient) -> None: + user = await create_user(session=session, name="test_user") + project = await create_project(session=session, owner=user, name="test_project") + await add_project_member( + session=session, + project=project, + user=user, + project_role=ProjectRole.ADMIN, + ) + event_ids = [uuid.uuid4() for _ in range(2)] + with patch("uuid.uuid4", side_effect=event_ids): + with freeze_time(datetime(2026, 1, 1, 12, 0, 0)): + events.emit( + session, + "User added to project", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(user), events.Target.from_model(project)], + ) + with freeze_time(datetime(2026, 1, 1, 12, 0, 1)): + events.emit( + session, + "Project updated", + actor=events.SystemActor(), + targets=[events.Target.from_model(project)], + ) + await session.commit() + + resp = await client.post("/api/events/list", headers=get_auth_headers(user.token), json={}) + resp.raise_for_status() + resp_data = resp.json() + for event in resp_data: + event["targets"].sort(key=lambda t: t["type"]) # for consistent comparison + assert resp_data == [ + { + "id": str(event_ids[1]), + "message": "Project updated", + "recorded_at": "2026-01-01T12:00:01+00:00", + "actor_user_id": None, + "actor_user": None, + "targets": [ + { + "type": "project", + "project_id": str(project.id), + "id": str(project.id), + "name": "test_project", + }, + ], + }, + { + "id": str(event_ids[0]), + "message": "User added to project", + "recorded_at": "2026-01-01T12:00:00+00:00", + "actor_user_id": str(user.id), + "actor_user": "test_user", + "targets": [ + { + "type": "project", + "project_id": str(project.id), + "id": str(project.id), + "name": "test_project", + }, + { + "type": "user", + "project_id": None, + "id": str(user.id), + "name": "test_user", + }, + ], + }, + ] + + async def test_empty_response_when_no_events( + self, session: AsyncSession, client: AsyncClient + ) -> None: + user = await create_user(session=session) + resp = await client.post("/api/events/list", headers=get_auth_headers(user.token), json={}) + resp.raise_for_status() + assert resp.json() == [] + + class TestListEventsAccessControl: async def test_user_sees_events_about_themselves( self, session: AsyncSession, client: AsyncClient @@ -43,13 +133,13 @@ async def test_user_sees_events_about_themselves( events.emit( session, "User created", - actor=events.UserActor(admin_user.id), + actor=events.UserActor.from_user(admin_user), targets=[events.Target.from_model(admin_user)], ) events.emit( session, "User created", - actor=events.UserActor(admin_user.id), + actor=events.UserActor.from_user(admin_user), targets=[events.Target.from_model(regular_user)], ) await session.commit() @@ -117,25 +207,25 @@ async def test_user_sees_events_within_their_project( events.emit( session, "Project created", - actor=events.UserActor(admin_user.id), + actor=events.UserActor.from_user(admin_user), targets=[events.Target.from_model(admin_project)], ) events.emit( session, "Project created", - actor=events.UserActor(admin_user.id), + actor=events.UserActor.from_user(admin_user), targets=[events.Target.from_model(regular_project)], ) events.emit( session, "Fleet created", - actor=events.UserActor(admin_user.id), + actor=events.UserActor.from_user(admin_user), targets=[events.Target.from_model(admin_fleet)], ) events.emit( session, "Fleet created", - actor=events.UserActor(admin_user.id), + actor=events.UserActor.from_user(admin_user), targets=[events.Target.from_model(regular_fleet)], ) await session.commit() @@ -172,13 +262,13 @@ async def test_filters_do_not_bypass_access_control( events.emit( session, "Project created", - actor=events.UserActor(admin.id), + actor=events.UserActor.from_user(admin), targets=[events.Target.from_model(project)], ) events.emit( session, "Fleet created", - actor=events.UserActor(admin.id), + actor=events.UserActor.from_user(admin), targets=[events.Target.from_model(fleet)], ) await session.commit() @@ -239,25 +329,25 @@ async def test_target_projects(self, session: AsyncSession, client: AsyncClient) events.emit( session, "User created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(user)], ) events.emit( session, "Project created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(project_a)], ) events.emit( session, "Project created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(project_b)], ) events.emit( session, "Fleet created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(fleet_a)], ) await session.commit() @@ -295,19 +385,19 @@ async def test_target_users(self, session: AsyncSession, client: AsyncClient) -> events.emit( session, "User created", - actor=events.UserActor(user_a.id), + actor=events.UserActor.from_user(user_a), targets=[events.Target.from_model(user_a)], ) events.emit( session, "User created", - actor=events.UserActor(user_b.id), + actor=events.UserActor.from_user(user_b), targets=[events.Target.from_model(user_b)], ) events.emit( session, "Project created", - actor=events.UserActor(user_a.id), + actor=events.UserActor.from_user(user_a), targets=[events.Target.from_model(project_a)], ) await session.commit() @@ -359,19 +449,19 @@ async def test_target_fleets(self, session: AsyncSession, client: AsyncClient) - events.emit( session, "Fleet created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(fleet_a)], ) events.emit( session, "Fleet created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(fleet_b)], ) events.emit( session, "Instance created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(instance_a)], ) await session.commit() @@ -419,19 +509,19 @@ async def test_target_instances(self, session: AsyncSession, client: AsyncClient events.emit( session, "Fleet created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(fleet)], ) events.emit( session, "Instance created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(instance_a)], ) events.emit( session, "Instance created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(instance_b)], ) await session.commit() @@ -487,19 +577,19 @@ async def test_target_runs(self, session: AsyncSession, client: AsyncClient) -> events.emit( session, "Run created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(run_a)], ) events.emit( session, "Run created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(run_b)], ) events.emit( session, "Job created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(job_a)], ) await session.commit() @@ -552,19 +642,19 @@ async def test_target_jobs(self, session: AsyncSession, client: AsyncClient) -> events.emit( session, "Run created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(run)], ) events.emit( session, "Job created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(job_a)], ) events.emit( session, "Job created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(job_b)], ) await session.commit() @@ -608,31 +698,31 @@ async def test_within_projects(self, session: AsyncSession, client: AsyncClient) events.emit( session, "User created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(user)], ) events.emit( session, "Project created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(project_a)], ) events.emit( session, "Project created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(project_b)], ) events.emit( session, "Fleet created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(fleet_a)], ) events.emit( session, "Instance created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(instance_a)], ) await session.commit() @@ -682,25 +772,25 @@ async def test_within_fleets(self, session: AsyncSession, client: AsyncClient) - events.emit( session, "Project created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(project)], ) events.emit( session, "Fleet created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(fleet_a)], ) events.emit( session, "Fleet created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(fleet_b)], ) events.emit( session, "Instance created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(isinstance_a)], ) await session.commit() @@ -754,25 +844,25 @@ async def test_within_runs(self, session: AsyncSession, client: AsyncClient) -> events.emit( session, "Project created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(project)], ) events.emit( session, "Run created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(run_a)], ) events.emit( session, "Run created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(run_b)], ) events.emit( session, "Job created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(job_a)], ) await session.commit() @@ -813,19 +903,19 @@ async def test_include_target_types(self, session: AsyncSession, client: AsyncCl events.emit( session, "Project created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(project)], ) events.emit( session, "Fleet created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(fleet)], ) events.emit( session, "Instance created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(instance)], ) await session.commit() @@ -860,6 +950,75 @@ async def test_include_target_types(self, session: AsyncSession, client: AsyncCl "fleet", } + async def test_within_projects_and_include_target_types( + self, session: AsyncSession, client: AsyncClient + ) -> None: + user = await create_user(session=session) + project_a = await create_project(session=session, name="project_a", owner=user) + project_b = await create_project(session=session, name="project_b", owner=user) + fleet_a = await create_fleet(session=session, project=project_a) + instance_a = await create_instance( + session=session, + project=project_a, + fleet=fleet_a, + ) + fleet_b = await create_fleet(session=session, project=project_b) + instance_b = await create_instance( + session=session, + project=project_b, + fleet=fleet_b, + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project_a)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_a)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_a)], + ) + events.emit( + session, + "Project created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(project_b)], + ) + events.emit( + session, + "Fleet created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(fleet_b)], + ) + events.emit( + session, + "Instance created", + actor=events.UserActor.from_user(user), + targets=[events.Target.from_model(instance_b)], + ) + await session.commit() + + resp = await client.post( + "/api/events/list", + headers=get_auth_headers(user.token), + json={ + "within_projects": [str(project_a.id)], + "include_target_types": ["fleet"], + }, + ) + resp.raise_for_status() + assert len(resp.json()) == 1 + assert resp.json()[0]["targets"][0]["type"] == "fleet" + assert resp.json()[0]["targets"][0]["id"] == str(fleet_a.id) + async def test_actors(self, session: AsyncSession, client: AsyncClient) -> None: user_a = await create_user(session=session, name="user_a") user_b = await create_user(session=session, name="user_b") @@ -868,13 +1027,13 @@ async def test_actors(self, session: AsyncSession, client: AsyncClient) -> None: events.emit( session, "Project created", - actor=events.UserActor(user_a.id), + actor=events.UserActor.from_user(user_a), targets=[events.Target.from_model(project_a)], ) events.emit( session, "Project created", - actor=events.UserActor(user_b.id), + actor=events.UserActor.from_user(user_b), targets=[events.Target.from_model(project_b)], ) events.emit( @@ -945,7 +1104,7 @@ async def test_event_included_if_at_least_one_target_is_within_filters( events.emit( session, "Fleet instances created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[ events.Target.from_model(instance_a), events.Target.from_model(instance_b), @@ -959,7 +1118,7 @@ async def test_event_included_if_at_least_one_target_is_within_filters( events.emit( session, "Instance created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(instance_c)], ) await session.commit() @@ -1007,7 +1166,7 @@ async def test_pagination( events.emit( session, "User created", - actor=events.UserActor(user.id), + actor=events.UserActor.from_user(user), targets=[events.Target.from_model(user)], ) await session.commit() From bea2d42c6e847451a8e0ba480526e96bf3d637ea Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Fri, 5 Dec 2025 09:12:48 +0100 Subject: [PATCH 3/3] Use `list_enum_values_for_annotation` --- src/dstack/_internal/core/models/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dstack/_internal/core/models/events.py b/src/dstack/_internal/core/models/events.py index f40854892f..3fb556101e 100644 --- a/src/dstack/_internal/core/models/events.py +++ b/src/dstack/_internal/core/models/events.py @@ -6,6 +6,7 @@ from pydantic import Field from dstack._internal.core.models.common import CoreModel +from dstack._internal.utils.common import list_enum_values_for_annotation class EventTargetType(str, Enum): @@ -23,7 +24,7 @@ class EventTarget(CoreModel): Field( description=( f"Type of the target entity." - f" One of: {', '.join([f'`{t}`' for t in EventTargetType])}" + f" One of: {list_enum_values_for_annotation(EventTargetType)}" ) ), ]