Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions src/dstack/_internal/cli/commands/event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import argparse
from dataclasses import asdict

from dstack._internal.cli.commands import APIBaseCommand
from dstack._internal.cli.services.events import EventListFilters, EventPaginator, print_event
from dstack._internal.cli.utils.common import (
get_start_time,
)
from dstack._internal.core.models.events import EventTargetType
from dstack._internal.server.schemas.events import LIST_EVENTS_DEFAULT_LIMIT
from dstack.api import Client


class EventCommand(APIBaseCommand):
NAME = "event"
DESCRIPTION = "View events"

def _register(self):
super()._register()
self._parser.set_defaults(subfunc=self._list)
subparsers = self._parser.add_subparsers(dest="action")

list_parser = subparsers.add_parser(
"list",
help="List events within the selected project",
formatter_class=self._parser.formatter_class,
)
list_parser.set_defaults(subfunc=self._list)

for parser in [self._parser, list_parser]:
parser.add_argument(
"--since",
help=(
"Only show events newer than the specified date."
" Can be a duration (e.g. 10s, 5m, 1d) or an RFC 3339 string (e.g. 2023-09-24T15:30:00Z)."
f" If not specified, show the last {LIST_EVENTS_DEFAULT_LIMIT} events."
),
type=str,
)
target_filters_group = parser.add_mutually_exclusive_group()
target_filters_group.add_argument(
"--target-fleet",
action="append",
metavar="NAME",
dest="target_fleets",
help="Only show events that target the specified fleets",
)
target_filters_group.add_argument(
"--target-run",
action="append",
metavar="NAME",
dest="target_runs",
help="Only show events that target the specified runs",
)
within_filters_group = parser.add_mutually_exclusive_group()
within_filters_group.add_argument(
"--within-fleet",
action="append",
metavar="NAME",
dest="within_fleets",
help="Only show events that target the specified fleets or instances within those fleets",
)
within_filters_group.add_argument(
"--within-run",
action="append",
metavar="NAME",
dest="within_runs",
help="Only show events that target the specified runs or jobs within those runs",
)
parser.add_argument(
"--include-target-type",
action="append",
metavar="TYPE",
type=EventTargetType,
dest="include_target_types",
help="Only show events that target entities of the specified types",
)

def _command(self, args: argparse.Namespace):
super()._command(args)
args.subfunc(args)

def _list(self, args: argparse.Namespace):
since = get_start_time(args.since)
filters = _build_filters(args, self.api)

if since is not None:
events = EventPaginator(self.api.client.events).list(
filters=filters, since=since, ascending=True
)
else:
events = reversed(self.api.client.events.list(ascending=False, **asdict(filters)))
try:
for event in events:
print_event(event)
except KeyboardInterrupt:
pass


def _build_filters(args: argparse.Namespace, api: Client) -> EventListFilters:
filters = EventListFilters()

if args.target_fleets:
filters.target_fleets = [
api.client.fleets.get(api.project, name).id for name in args.target_fleets
]
elif args.target_runs:
filters.target_runs = [
api.client.runs.get(api.project, name).id for name in args.target_runs
]

if args.within_fleets:
filters.within_fleets = [
api.client.fleets.get(api.project, name).id for name in args.within_fleets
]
elif args.within_runs:
filters.within_runs = [
api.client.runs.get(api.project, name).id for name in args.within_runs
]
else:
filters.within_projects = [api.client.projects.get(api.project).project_id]

if args.include_target_types:
filters.include_target_types = args.include_target_types

return filters
2 changes: 2 additions & 0 deletions src/dstack/_internal/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from dstack._internal.cli.commands.attach import AttachCommand
from dstack._internal.cli.commands.completion import CompletionCommand
from dstack._internal.cli.commands.delete import DeleteCommand
from dstack._internal.cli.commands.event import EventCommand
from dstack._internal.cli.commands.fleet import FleetCommand
from dstack._internal.cli.commands.gateway import GatewayCommand
from dstack._internal.cli.commands.init import InitCommand
Expand Down Expand Up @@ -62,6 +63,7 @@ def main():
ApplyCommand.register(subparsers)
AttachCommand.register(subparsers)
DeleteCommand.register(subparsers)
EventCommand.register(subparsers)
FleetCommand.register(subparsers)
GatewayCommand.register(subparsers)
InitCommand.register(subparsers)
Expand Down
63 changes: 63 additions & 0 deletions src/dstack/_internal/cli/services/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import uuid
from collections.abc import Iterator
from dataclasses import asdict, dataclass
from datetime import datetime
from typing import Optional

from rich.text import Text

from dstack._internal.cli.utils.common import console
from dstack._internal.core.models.events import Event, EventTargetType
from dstack._internal.server.schemas.events import LIST_EVENTS_DEFAULT_LIMIT
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls avoid importing from server in cli. Move to core if needed both for server and cli.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although api already imports from schemas everywhere, so never mind.

from dstack.api.server._events import EventsAPIClient


@dataclass
class EventListFilters:
target_fleets: Optional[list[uuid.UUID]] = None
target_runs: Optional[list[uuid.UUID]] = None
within_projects: Optional[list[uuid.UUID]] = None
within_fleets: Optional[list[uuid.UUID]] = None
within_runs: Optional[list[uuid.UUID]] = None
include_target_types: Optional[list[EventTargetType]] = None


class EventPaginator:
def __init__(self, client: EventsAPIClient) -> None:
self._client = client

def list(
self, filters: EventListFilters, since: Optional[datetime], ascending: bool
) -> Iterator[Event]:
prev_id = None
prev_recorded_at = since
while True:
events = self._client.list(
prev_id=prev_id,
prev_recorded_at=prev_recorded_at,
limit=LIST_EVENTS_DEFAULT_LIMIT,
ascending=ascending,
**asdict(filters),
)
for event in events:
yield event
if len(events) < LIST_EVENTS_DEFAULT_LIMIT:
break
prev_id = events[-1].id
prev_recorded_at = events[-1].recorded_at


def print_event(event: Event) -> None:
recorded_at = event.recorded_at.astimezone().strftime("%Y-%m-%d %H:%M:%S")
targets = ", ".join(f"{target.type} {target.name}" for target in event.targets)
message = [
Text(f"[{recorded_at}]", style="log.time"),
Text(event.message, style="log.message"),
Text(f"[{targets}]", style="secondary"),
]
if event.actor_user:
message.append(Text(f"👤 {event.actor_user}"))
console.print(
*message,
soft_wrap=True, # Strictly one line per event. Allows for grepping
)
3 changes: 2 additions & 1 deletion src/dstack/_internal/server/schemas/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

MIN_FILTER_ITEMS = 1
MAX_FILTER_ITEMS = 16 # Conservative limit to prevent overly complex db queries
LIST_EVENTS_DEFAULT_LIMIT = 100


class ListEventsRequest(CoreModel):
Expand Down Expand Up @@ -142,7 +143,7 @@ class ListEventsRequest(CoreModel):
] = None
prev_recorded_at: Optional[datetime] = None
prev_id: Optional[UUID] = None
limit: int = Field(100, ge=1, le=100)
limit: int = Field(LIST_EVENTS_DEFAULT_LIMIT, ge=1, le=100)
ascending: bool = False

@root_validator
Expand Down
5 changes: 5 additions & 0 deletions src/dstack/api/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from dstack._internal.utils.logging import get_logger
from dstack.api.server._backends import BackendsAPIClient
from dstack.api.server._events import EventsAPIClient
from dstack.api.server._files import FilesAPIClient
from dstack.api.server._fleets import FleetsAPIClient
from dstack.api.server._gateways import GatewaysAPIClient
Expand Down Expand Up @@ -122,6 +123,10 @@ def volumes(self) -> VolumesAPIClient:
def files(self) -> FilesAPIClient:
return FilesAPIClient(self._request, self._logger)

@property
def events(self) -> EventsAPIClient:
return EventsAPIClient(self._request, self._logger)

def get_token_hash(self) -> str:
return hashlib.sha1(self._token.encode()).hexdigest()[:8]

Expand Down
53 changes: 53 additions & 0 deletions src/dstack/api/server/_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID

from pydantic import parse_obj_as

from dstack._internal.core.models.events import Event, EventTargetType
from dstack._internal.server.schemas.events import LIST_EVENTS_DEFAULT_LIMIT, ListEventsRequest
from dstack.api.server._group import APIClientGroup


class EventsAPIClient(APIClientGroup):
def list(
self,
target_projects: Optional[list[UUID]] = None,
target_users: Optional[list[UUID]] = None,
target_fleets: Optional[list[UUID]] = None,
target_instances: Optional[list[UUID]] = None,
target_runs: Optional[list[UUID]] = None,
target_jobs: Optional[list[UUID]] = None,
within_projects: Optional[list[UUID]] = None,
within_fleets: Optional[list[UUID]] = None,
within_runs: Optional[list[UUID]] = None,
include_target_types: Optional[list[EventTargetType]] = None,
actors: Optional[list[Optional[UUID]]] = None,
prev_recorded_at: Optional[datetime] = None,
prev_id: Optional[UUID] = None,
limit: int = LIST_EVENTS_DEFAULT_LIMIT,
ascending: bool = False,
) -> list[Event]:
if prev_recorded_at is not None:
# Time zones other than UTC are misinterpreted by the server:
# https://github.com/dstackai/dstack/issues/3354
prev_recorded_at = prev_recorded_at.astimezone(timezone.utc)
req = ListEventsRequest(
target_projects=target_projects,
target_users=target_users,
target_fleets=target_fleets,
target_instances=target_instances,
target_runs=target_runs,
target_jobs=target_jobs,
within_projects=within_projects,
within_fleets=within_fleets,
within_runs=within_runs,
include_target_types=include_target_types,
actors=actors,
prev_recorded_at=prev_recorded_at,
prev_id=prev_id,
limit=limit,
ascending=ascending,
)
resp = self._request("/api/events/list", body=req.json())
return parse_obj_as(list[Event.__response__], resp.json())