From e5a4cc558ad765e529424e130b4c6a0e14454089 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Wed, 10 Dec 2025 09:27:08 +0100 Subject: [PATCH] Add the `dstack event` command for viewing events --- src/dstack/_internal/cli/commands/event.py | 126 ++++++++++++++++++ src/dstack/_internal/cli/main.py | 2 + src/dstack/_internal/cli/services/events.py | 63 +++++++++ src/dstack/_internal/server/schemas/events.py | 3 +- src/dstack/api/server/__init__.py | 5 + src/dstack/api/server/_events.py | 53 ++++++++ 6 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 src/dstack/_internal/cli/commands/event.py create mode 100644 src/dstack/_internal/cli/services/events.py create mode 100644 src/dstack/api/server/_events.py diff --git a/src/dstack/_internal/cli/commands/event.py b/src/dstack/_internal/cli/commands/event.py new file mode 100644 index 0000000000..3284e914b0 --- /dev/null +++ b/src/dstack/_internal/cli/commands/event.py @@ -0,0 +1,126 @@ +import argparse +from dataclasses import asdict + +from dstack._internal.cli.commands import APIBaseCommand +from dstack._internal.cli.services.events import EventListFilters, EventPaginator, print_event +from dstack._internal.cli.utils.common import ( + get_start_time, +) +from dstack._internal.core.models.events import EventTargetType +from dstack._internal.server.schemas.events import LIST_EVENTS_DEFAULT_LIMIT +from dstack.api import Client + + +class EventCommand(APIBaseCommand): + NAME = "event" + DESCRIPTION = "View events" + + def _register(self): + super()._register() + self._parser.set_defaults(subfunc=self._list) + subparsers = self._parser.add_subparsers(dest="action") + + list_parser = subparsers.add_parser( + "list", + help="List events within the selected project", + formatter_class=self._parser.formatter_class, + ) + list_parser.set_defaults(subfunc=self._list) + + for parser in [self._parser, list_parser]: + parser.add_argument( + "--since", + help=( + "Only show events newer than the specified date." + " Can be a duration (e.g. 10s, 5m, 1d) or an RFC 3339 string (e.g. 2023-09-24T15:30:00Z)." + f" If not specified, show the last {LIST_EVENTS_DEFAULT_LIMIT} events." + ), + type=str, + ) + target_filters_group = parser.add_mutually_exclusive_group() + target_filters_group.add_argument( + "--target-fleet", + action="append", + metavar="NAME", + dest="target_fleets", + help="Only show events that target the specified fleets", + ) + target_filters_group.add_argument( + "--target-run", + action="append", + metavar="NAME", + dest="target_runs", + help="Only show events that target the specified runs", + ) + within_filters_group = parser.add_mutually_exclusive_group() + within_filters_group.add_argument( + "--within-fleet", + action="append", + metavar="NAME", + dest="within_fleets", + help="Only show events that target the specified fleets or instances within those fleets", + ) + within_filters_group.add_argument( + "--within-run", + action="append", + metavar="NAME", + dest="within_runs", + help="Only show events that target the specified runs or jobs within those runs", + ) + parser.add_argument( + "--include-target-type", + action="append", + metavar="TYPE", + type=EventTargetType, + dest="include_target_types", + help="Only show events that target entities of the specified types", + ) + + def _command(self, args: argparse.Namespace): + super()._command(args) + args.subfunc(args) + + def _list(self, args: argparse.Namespace): + since = get_start_time(args.since) + filters = _build_filters(args, self.api) + + if since is not None: + events = EventPaginator(self.api.client.events).list( + filters=filters, since=since, ascending=True + ) + else: + events = reversed(self.api.client.events.list(ascending=False, **asdict(filters))) + try: + for event in events: + print_event(event) + except KeyboardInterrupt: + pass + + +def _build_filters(args: argparse.Namespace, api: Client) -> EventListFilters: + filters = EventListFilters() + + if args.target_fleets: + filters.target_fleets = [ + api.client.fleets.get(api.project, name).id for name in args.target_fleets + ] + elif args.target_runs: + filters.target_runs = [ + api.client.runs.get(api.project, name).id for name in args.target_runs + ] + + if args.within_fleets: + filters.within_fleets = [ + api.client.fleets.get(api.project, name).id for name in args.within_fleets + ] + elif args.within_runs: + filters.within_runs = [ + api.client.runs.get(api.project, name).id for name in args.within_runs + ] + else: + filters.within_projects = [api.client.projects.get(api.project).project_id] + + if args.include_target_types: + filters.include_target_types = args.include_target_types + + return filters diff --git a/src/dstack/_internal/cli/main.py b/src/dstack/_internal/cli/main.py index 55248f3cf0..98be45b8d5 100644 --- a/src/dstack/_internal/cli/main.py +++ b/src/dstack/_internal/cli/main.py @@ -8,6 +8,7 @@ from dstack._internal.cli.commands.attach import AttachCommand from dstack._internal.cli.commands.completion import CompletionCommand from dstack._internal.cli.commands.delete import DeleteCommand +from dstack._internal.cli.commands.event import EventCommand from dstack._internal.cli.commands.fleet import FleetCommand from dstack._internal.cli.commands.gateway import GatewayCommand from dstack._internal.cli.commands.init import InitCommand @@ -62,6 +63,7 @@ def main(): ApplyCommand.register(subparsers) AttachCommand.register(subparsers) DeleteCommand.register(subparsers) + EventCommand.register(subparsers) FleetCommand.register(subparsers) GatewayCommand.register(subparsers) InitCommand.register(subparsers) diff --git a/src/dstack/_internal/cli/services/events.py b/src/dstack/_internal/cli/services/events.py new file mode 100644 index 0000000000..c3e697234c --- /dev/null +++ b/src/dstack/_internal/cli/services/events.py @@ -0,0 +1,63 @@ +import uuid +from collections.abc import Iterator +from dataclasses import asdict, dataclass +from datetime import datetime +from typing import Optional + +from rich.text import Text + +from dstack._internal.cli.utils.common import console +from dstack._internal.core.models.events import Event, EventTargetType +from dstack._internal.server.schemas.events import LIST_EVENTS_DEFAULT_LIMIT +from dstack.api.server._events import EventsAPIClient + + +@dataclass +class EventListFilters: + target_fleets: Optional[list[uuid.UUID]] = None + target_runs: Optional[list[uuid.UUID]] = None + within_projects: Optional[list[uuid.UUID]] = None + within_fleets: Optional[list[uuid.UUID]] = None + within_runs: Optional[list[uuid.UUID]] = None + include_target_types: Optional[list[EventTargetType]] = None + + +class EventPaginator: + def __init__(self, client: EventsAPIClient) -> None: + self._client = client + + def list( + self, filters: EventListFilters, since: Optional[datetime], ascending: bool + ) -> Iterator[Event]: + prev_id = None + prev_recorded_at = since + while True: + events = self._client.list( + prev_id=prev_id, + prev_recorded_at=prev_recorded_at, + limit=LIST_EVENTS_DEFAULT_LIMIT, + ascending=ascending, + **asdict(filters), + ) + for event in events: + yield event + if len(events) < LIST_EVENTS_DEFAULT_LIMIT: + break + prev_id = events[-1].id + prev_recorded_at = events[-1].recorded_at + + +def print_event(event: Event) -> None: + recorded_at = event.recorded_at.astimezone().strftime("%Y-%m-%d %H:%M:%S") + targets = ", ".join(f"{target.type} {target.name}" for target in event.targets) + message = [ + Text(f"[{recorded_at}]", style="log.time"), + Text(event.message, style="log.message"), + Text(f"[{targets}]", style="secondary"), + ] + if event.actor_user: + message.append(Text(f"👤 {event.actor_user}")) + console.print( + *message, + soft_wrap=True, # Strictly one line per event. Allows for grepping + ) diff --git a/src/dstack/_internal/server/schemas/events.py b/src/dstack/_internal/server/schemas/events.py index 05ed5b323c..537a731970 100644 --- a/src/dstack/_internal/server/schemas/events.py +++ b/src/dstack/_internal/server/schemas/events.py @@ -10,6 +10,7 @@ MIN_FILTER_ITEMS = 1 MAX_FILTER_ITEMS = 16 # Conservative limit to prevent overly complex db queries +LIST_EVENTS_DEFAULT_LIMIT = 100 class ListEventsRequest(CoreModel): @@ -142,7 +143,7 @@ class ListEventsRequest(CoreModel): ] = None prev_recorded_at: Optional[datetime] = None prev_id: Optional[UUID] = None - limit: int = Field(100, ge=1, le=100) + limit: int = Field(LIST_EVENTS_DEFAULT_LIMIT, ge=1, le=100) ascending: bool = False @root_validator diff --git a/src/dstack/api/server/__init__.py b/src/dstack/api/server/__init__.py index be0e586e62..2ad94f0864 100644 --- a/src/dstack/api/server/__init__.py +++ b/src/dstack/api/server/__init__.py @@ -15,6 +15,7 @@ ) from dstack._internal.utils.logging import get_logger from dstack.api.server._backends import BackendsAPIClient +from dstack.api.server._events import EventsAPIClient from dstack.api.server._files import FilesAPIClient from dstack.api.server._fleets import FleetsAPIClient from dstack.api.server._gateways import GatewaysAPIClient @@ -122,6 +123,10 @@ def volumes(self) -> VolumesAPIClient: def files(self) -> FilesAPIClient: return FilesAPIClient(self._request, self._logger) + @property + def events(self) -> EventsAPIClient: + return EventsAPIClient(self._request, self._logger) + def get_token_hash(self) -> str: return hashlib.sha1(self._token.encode()).hexdigest()[:8] diff --git a/src/dstack/api/server/_events.py b/src/dstack/api/server/_events.py new file mode 100644 index 0000000000..2f5d6639a3 --- /dev/null +++ b/src/dstack/api/server/_events.py @@ -0,0 +1,53 @@ +from datetime import datetime, timezone +from typing import Optional +from uuid import UUID + +from pydantic import parse_obj_as + +from dstack._internal.core.models.events import Event, EventTargetType +from dstack._internal.server.schemas.events import LIST_EVENTS_DEFAULT_LIMIT, ListEventsRequest +from dstack.api.server._group import APIClientGroup + + +class EventsAPIClient(APIClientGroup): + def list( + self, + target_projects: Optional[list[UUID]] = None, + target_users: Optional[list[UUID]] = None, + target_fleets: Optional[list[UUID]] = None, + target_instances: Optional[list[UUID]] = None, + target_runs: Optional[list[UUID]] = None, + target_jobs: Optional[list[UUID]] = None, + within_projects: Optional[list[UUID]] = None, + within_fleets: Optional[list[UUID]] = None, + within_runs: Optional[list[UUID]] = None, + include_target_types: Optional[list[EventTargetType]] = None, + actors: Optional[list[Optional[UUID]]] = None, + prev_recorded_at: Optional[datetime] = None, + prev_id: Optional[UUID] = None, + limit: int = LIST_EVENTS_DEFAULT_LIMIT, + ascending: bool = False, + ) -> list[Event]: + if prev_recorded_at is not None: + # Time zones other than UTC are misinterpreted by the server: + # https://github.com/dstackai/dstack/issues/3354 + prev_recorded_at = prev_recorded_at.astimezone(timezone.utc) + req = ListEventsRequest( + target_projects=target_projects, + target_users=target_users, + target_fleets=target_fleets, + target_instances=target_instances, + target_runs=target_runs, + target_jobs=target_jobs, + within_projects=within_projects, + within_fleets=within_fleets, + within_runs=within_runs, + include_target_types=include_target_types, + actors=actors, + prev_recorded_at=prev_recorded_at, + prev_id=prev_id, + limit=limit, + ascending=ascending, + ) + resp = self._request("/api/events/list", body=req.json()) + return parse_obj_as(list[Event.__response__], resp.json())