From 8e3e3f44315c3969d442e1c54dcde0f1ffa1e84b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Tue, 17 Jun 2025 06:38:58 -0700 Subject: [PATCH 1/7] feat: Rich UI treatments --- Makefile | 3 +- pyproject.toml | 4 + src/tetra_rp/__init__.py | 2 + src/tetra_rp/client.py | 1 + .../core/resources/resource_manager.py | 14 +- src/tetra_rp/core/resources/serverless.py | 164 ++++--- src/tetra_rp/core/utils/rich_ui.py | 442 ++++++++++++++++++ src/tetra_rp/logger.py | 48 +- src/tetra_rp/stubs/live_serverless.py | 11 +- uv.lock | 9 + 10 files changed, 623 insertions(+), 75 deletions(-) create mode 100644 src/tetra_rp/core/utils/rich_ui.py diff --git a/Makefile b/Makefile index b273b6f5..6a3cb6d6 100644 --- a/Makefile +++ b/Makefile @@ -7,12 +7,11 @@ endif dev: uv sync --all-groups - make examples proto: # TODO: auto-generate proto files -examples: +examples: dev git submodule init git submodule update --remote @echo "🚀 Running make inside tetra-examples..."; \ diff --git a/pyproject.toml b/pyproject.toml index a0c034af..f9dbfbe0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,8 +21,12 @@ dependencies = [ "cloudpickle>=3.1.1", "runpod~=1.7.9", "python-dotenv>=1.0.0", + "rich>=14.0.0", ] +[project.optional-dependencies] +rich = ["rich>=13.0.0"] + [dependency-groups] dev = [ "ruff>=0.11.9", diff --git a/src/tetra_rp/__init__.py b/src/tetra_rp/__init__.py index 07d26204..fa8dfdcf 100644 --- a/src/tetra_rp/__init__.py +++ b/src/tetra_rp/__init__.py @@ -7,11 +7,13 @@ ResourceManager, ServerlessEndpoint, ) +from .core.utils.rich_ui import capture_local_prints __all__ = [ "get_logger", "remote", + "capture_local_prints", "CudaVersion", "GpuGroup", "LiveServerless", diff --git a/src/tetra_rp/client.py b/src/tetra_rp/client.py index 5218f9ae..9310feaa 100644 --- a/src/tetra_rp/client.py +++ b/src/tetra_rp/client.py @@ -1,5 +1,6 @@ from functools import wraps from tetra_rp import get_logger +from tetra_rp.core.utils.rich_ui import capture_local_prints from typing import List from .core.resources import ServerlessResource, ResourceManager from .stubs import stub_resource diff --git a/src/tetra_rp/core/resources/resource_manager.py b/src/tetra_rp/core/resources/resource_manager.py index 7a83cb88..d5f9aa2e 100644 --- a/src/tetra_rp/core/resources/resource_manager.py +++ b/src/tetra_rp/core/resources/resource_manager.py @@ -4,6 +4,7 @@ from tetra_rp import get_logger from tetra_rp.core.utils.singleton import SingletonMixin +from tetra_rp.core.utils.rich_ui import is_rich_enabled, create_reused_resource_panel, print_with_rich from .base import DeployableResource @@ -29,7 +30,8 @@ def _load_resources(self) -> Dict[str, DeployableResource]: try: with open(RESOURCE_STATE_FILE, "rb") as f: self._resources = cloudpickle.load(f) - log.debug(f"Loaded saved resources from {RESOURCE_STATE_FILE}") + if not is_rich_enabled(): + log.debug(f"Loaded saved resources from {RESOURCE_STATE_FILE}") except Exception as e: log.error(f"Failed to load resources from {RESOURCE_STATE_FILE}: {e}") return self._resources @@ -38,7 +40,8 @@ def _save_resources(self) -> None: """Persist state of resources to disk using cloudpickle.""" with open(RESOURCE_STATE_FILE, "wb") as f: cloudpickle.dump(self._resources, f) - log.debug(f"Saved resources in {RESOURCE_STATE_FILE}") + if not is_rich_enabled(): + log.debug(f"Saved resources in {RESOURCE_STATE_FILE}") def add_resource(self, uid: str, resource: DeployableResource): """Add a resource to the manager.""" @@ -68,7 +71,12 @@ async def get_or_deploy_resource( self.remove_resource(uid) return await self.get_or_deploy_resource(config) - log.debug(f"{existing} exists, reusing.") + if is_rich_enabled(): + # Show a panel for reused resources similar to fresh deployments + panel = create_reused_resource_panel(existing.name, existing.id, existing.console_url) + print_with_rich(panel) + else: + log.debug(f"{existing} exists, reusing.") return existing if deployed_resource := await config.deploy(): diff --git a/src/tetra_rp/core/resources/serverless.py b/src/tetra_rp/core/resources/serverless.py index 0466b81b..b9ed67a1 100644 --- a/src/tetra_rp/core/resources/serverless.py +++ b/src/tetra_rp/core/resources/serverless.py @@ -6,6 +6,14 @@ from tetra_rp import get_logger from tetra_rp.core.utils.backoff import get_backoff_delay +from tetra_rp.core.utils.rich_ui import ( + job_progress_tracker, + create_deployment_panel, + create_metrics_table, + format_status_text, + print_with_rich, + is_rich_enabled +) from runpod.endpoint.runner import Job from .cloud import runpod @@ -181,7 +189,11 @@ async def deploy(self) -> "DeployableResource": result = runpod.create_endpoint(**payload) if endpoint := self.__class__(**result): - log.info(f"Deployed: {endpoint}") + if is_rich_enabled(): + panel = create_deployment_panel(endpoint.name, endpoint.id, endpoint.console_url) + print_with_rich(panel) + else: + log.info(f"Deployed: {endpoint}") return endpoint raise ValueError("Deployment failed, no endpoint was returned.") @@ -229,7 +241,8 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput": raise ValueError("Serverless is not deployed") log_group = f"{self}" - log.info(f"{self.console_url} | API /run") + if not is_rich_enabled(): + log.info(f"{self.console_url} | API /run") try: # log.debug(f"[{log_group}] Payload: {payload}") @@ -238,62 +251,91 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput": log_subgroup = f"Job:{job.job_id}" - log.info(f"{log_group} | Started {log_subgroup}") - - current_pace = 0 - attempt = 0 - job_status = Status.UNKNOWN - last_status = job_status - - # Poll for job status - while True: - await asyncio.sleep(current_pace) - - # Check endpoint health - health = await asyncio.to_thread(self.endpoint.health) - health = ServerlessHealth(**health) - - if health.is_ready: - # Check job status - job_status = await asyncio.to_thread(job.status) - log.debug(f"{log_group} | Status: {job_status}") - else: - # Check worker status - job_status = health.workers.status.value - log.debug(f"{log_group} | Status: {job_status}") - - if health.workers.status in [ - Status.THROTTLED, - Status.UNHEALTHY, - Status.UNKNOWN, - ]: - log.debug(f"{log_group} | Health {health}") - - if attempt >= 10: - # Give up - log.info(f"{log_subgroup} | Cancelling") - await asyncio.to_thread(job.cancel) - raise RuntimeError(health.workers.status.value) - - # Adjust polling pace appropriately - if last_status == job_status: - # nothing changed, increase the gap - attempt += 1 - indicator = "." * (attempt // 2) if attempt % 2 == 0 else "" - if indicator: - log.info(f"{log_subgroup} | {indicator}") - else: - # status changed, reset the gap - log.info(f"{log_subgroup} | Status: {job_status}") - attempt = 0 - + # Use Rich progress tracker if available + with job_progress_tracker(job.job_id, self.name) as tracker: + if not is_rich_enabled(): + log.info(f"{log_group} | Started {log_subgroup}") + elif tracker: + # Initialize the progress tracker with starting status + tracker.update_status("IN_QUEUE", "Job submitted, waiting for worker...") + + current_pace = 0 + attempt = 0 + job_status = Status.UNKNOWN last_status = job_status - current_pace = get_backoff_delay(attempt) - - if job_status in ("COMPLETED", "FAILED", "CANCELLED"): - response = await asyncio.to_thread(job._fetch_job) - return JobOutput(**response) + # Poll for job status + while True: + await asyncio.sleep(current_pace) + + # Check endpoint health + health = await asyncio.to_thread(self.endpoint.health) + health = ServerlessHealth(**health) + + if health.is_ready: + # Check job status + job_status = await asyncio.to_thread(job.status) + if not is_rich_enabled(): + log.debug(f"{log_group} | Status: {job_status}") + else: + # Check worker status + job_status = health.workers.status.value + if not is_rich_enabled(): + log.debug(f"{log_group} | Status: {job_status}") + + if health.workers.status in [ + Status.THROTTLED, + Status.UNHEALTHY, + Status.UNKNOWN, + ]: + if not is_rich_enabled(): + log.debug(f"{log_group} | Health {health}") + + if attempt >= 10: + # Give up + if tracker: + tracker.update_status("CANCELLED", "Timeout after 10 attempts") + else: + log.info(f"{log_subgroup} | Cancelling") + await asyncio.to_thread(job.cancel) + raise RuntimeError(health.workers.status.value) + + # Update Rich progress tracker or fallback to standard logging + if last_status == job_status: + # nothing changed, increase the gap + attempt += 1 + if tracker: + tracker.show_progress_indicator() + elif not is_rich_enabled(): + indicator = "." * (attempt // 2) if attempt % 2 == 0 else "" + if indicator: + log.info(f"{log_subgroup} | {indicator}") + else: + # status changed, reset the gap + if tracker: + # Map RunPod status to user-friendly message + status_message = "" + if job_status == "IN_QUEUE": + status_message = "Waiting for worker..." + elif job_status == "INITIALIZING": + status_message = "Starting up worker..." + elif job_status == "RUNNING": + status_message = "Executing function..." + + tracker.update_status(job_status, status_message) + elif not is_rich_enabled(): + log.info(f"{log_subgroup} | Status: {job_status}") + attempt = 0 + + last_status = job_status + + current_pace = get_backoff_delay(attempt) + + if job_status in ("COMPLETED", "FAILED", "CANCELLED"): + if tracker: + tracker.update_status(job_status) + response = await asyncio.to_thread(job._fetch_job) + return JobOutput(**response) except Exception as e: log.error(f"{log_group} | Exception: {e}") @@ -319,9 +361,13 @@ class JobOutput(BaseModel): error: Optional[str] = "" def model_post_init(self, __context): - log_group = f"Worker:{self.workerId}" - log.info(f"{log_group} | Delay Time: {self.delayTime} ms") - log.info(f"{log_group} | Execution Time: {self.executionTime} ms") + if is_rich_enabled(): + metrics_table = create_metrics_table(self.delayTime, self.executionTime, self.workerId) + print_with_rich(metrics_table) + else: + log_group = f"Worker:{self.workerId}" + log.info(f"{log_group} | Delay Time: {self.delayTime} ms") + log.info(f"{log_group} | Execution Time: {self.executionTime} ms") class Status(str, Enum): diff --git a/src/tetra_rp/core/utils/rich_ui.py b/src/tetra_rp/core/utils/rich_ui.py new file mode 100644 index 00000000..0ea68fc4 --- /dev/null +++ b/src/tetra_rp/core/utils/rich_ui.py @@ -0,0 +1,442 @@ +""" +Rich UI components for enhanced logging and progress display in Tetra. +This module provides Rich-based alternatives to standard logging output. +""" + +import builtins +import logging +import os +import time +from contextlib import contextmanager +from typing import Optional, Dict, Any, Generator, List, Union +from enum import Enum + +try: + from rich.console import Console + from rich.logging import RichHandler + from rich.panel import Panel + from rich.table import Table + from rich.status import Status + RICH_AVAILABLE = True +except ImportError: + RICH_AVAILABLE = False + # Create dummy classes for type hints when Rich is not available + class Console: pass + class Status: pass + + +class TetraStatus(str, Enum): + """Status types for visual styling""" + READY = "READY" + INITIALIZING = "INITIALIZING" + RUNNING = "RUNNING" + COMPLETED = "COMPLETED" + FAILED = "FAILED" + CANCELLED = "CANCELLED" + THROTTLED = "THROTTLED" + UNHEALTHY = "UNHEALTHY" + UNKNOWN = "UNKNOWN" + + +def is_rich_enabled() -> bool: + """Check if Rich UI should be enabled based on environment and availability""" + return ( + RICH_AVAILABLE and + os.environ.get("TETRA_RICH_UI", "false").lower() in ("true", "1", "yes") + ) + + +class RichUIManager: + """Central manager for Rich UI components""" + + def __init__(self): + self.console = Console() if RICH_AVAILABLE else None + self._enabled = is_rich_enabled() + self._captured_prints: List[str] = [] + self._original_print = None + self._print_capturing = False + + @property + def enabled(self) -> bool: + return self._enabled and self.console is not None + + def get_console(self) -> Optional["Console"]: + return self.console if self.enabled else None + + def start_print_capture(self) -> None: + """Start capturing print() calls""" + if not self.enabled or self._print_capturing: + return + + self._original_print = builtins.print + self._captured_prints.clear() + self._print_capturing = True + + def captured_print(*args, **kwargs): + # Convert print arguments to string + output = ' '.join(str(arg) for arg in args) + + # Handle common print kwargs + end = kwargs.get('end', '\n') + sep = kwargs.get('sep', ' ') + if len(args) > 1: + output = sep.join(str(arg) for arg in args) + + self._captured_prints.append(output + end.rstrip()) + + # Also send to original print for fallback scenarios + if not self.enabled: + self._original_print(*args, **kwargs) + + builtins.print = captured_print + + def stop_print_capture(self) -> List[str]: + """Stop capturing and return captured prints""" + if not self._print_capturing: + return [] + + if self._original_print: + builtins.print = self._original_print + + self._print_capturing = False + captured = self._captured_prints.copy() + self._captured_prints.clear() + return captured + + +# Global Rich UI manager instance +rich_ui = RichUIManager() + + +class RichLoggingFilter(logging.Filter): + """Filter to suppress verbose third-party logs when Rich UI is active""" + + def filter(self, record): + # Suppress asyncio selector debug messages + if record.name == "asyncio" and ("selector" in record.getMessage().lower() or "using selector" in record.getMessage().lower()): + return False + # Suppress other verbose third-party logs + if record.levelno <= logging.INFO and record.name.startswith(("urllib3", "requests", "boto3", "botocore")): + return False + # Suppress all DEBUG level logs when Rich UI is active (except errors/warnings) + if record.levelno <= logging.DEBUG: + return False + return True + + +def get_rich_handler() -> logging.Handler: + """Get Rich logging handler if available, otherwise return standard handler""" + if rich_ui.enabled: + handler = RichHandler( + console=rich_ui.console, + show_time=True, + show_path=False, + markup=True, + rich_tracebacks=True + ) + # Add filter to suppress verbose logs when Rich UI is active + handler.addFilter(RichLoggingFilter()) + return handler + else: + # Fallback to standard handler + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter( + "%(asctime)s | %(levelname)-5s | %(message)s" + )) + return handler + + +def get_status_color(status: str) -> str: + """Get Rich color for status""" + if not rich_ui.enabled: + return "" + + color_map = { + "READY": "green", + "COMPLETED": "green", + "RUNNING": "blue", + "IN_QUEUE": "cyan", + "INITIALIZING": "yellow", + "FAILED": "red", + "CANCELLED": "red", + "THROTTLED": "orange3", + "UNHEALTHY": "red", + "UNKNOWN": "dim" + } + return color_map.get(status.upper(), "white") + + +def format_status_text(status: str, message: str = "") -> str: + """Format status text with color if Rich is enabled""" + if not rich_ui.enabled: + return f"{status}: {message}" if message else status + + color = get_status_color(status) + formatted_status = f"[{color}]{status}[/{color}]" + return f"{formatted_status}: {message}" if message else formatted_status + + +def create_deployment_panel(endpoint_name: str, endpoint_id: str, console_url: str) -> Union[str, "Panel"]: + """Create a deployment summary panel""" + if not rich_ui.enabled or not RICH_AVAILABLE: + return f"Deployed: {endpoint_name} ({endpoint_id})" + + table = Table.grid(padding=1) + table.add_column(style="bold cyan", no_wrap=True) + table.add_column() + + table.add_row("Endpoint:", f"[green]{endpoint_name}[/green]") + table.add_row("ID:", f"[dim]{endpoint_id}[/dim]") + table.add_row("Console:", f"[link={console_url}]{console_url}[/link]") + + panel = Panel( + table, + title="[bold green]🚀 Deployment Successful[/bold green]", + border_style="green" + ) + return panel + + +def create_reused_resource_panel(endpoint_name: str, endpoint_id: str, console_url: str) -> Union[str, "Panel"]: + """Create a panel for reused existing resources""" + if not rich_ui.enabled or not RICH_AVAILABLE: + return f"Reusing: {endpoint_name} ({endpoint_id})" + + table = Table.grid(padding=1) + table.add_column(style="bold cyan", no_wrap=True) + table.add_column() + + table.add_row("Endpoint:", f"[blue]{endpoint_name}[/blue]") + table.add_row("ID:", f"[dim]{endpoint_id}[/dim]") + table.add_row("Console:", f"[link={console_url}]{console_url}[/link]") + + panel = Panel( + table, + title="[bold blue]♻️ Using Existing Resource[/bold blue]", + border_style="blue" + ) + return panel + + +def create_metrics_table(delay_time: int, execution_time: int, worker_id: str) -> Union[str, "Panel"]: + """Create a metrics display table""" + if not rich_ui.enabled or not RICH_AVAILABLE: + return f"Worker:{worker_id} | Delay: {delay_time}ms | Execution: {execution_time}ms" + + table = Table(show_header=True, header_style="bold magenta") + table.add_column("Metric", style="cyan", no_wrap=True) + table.add_column("Value", style="green") + + table.add_row("Worker ID", f"[dim]{worker_id}[/dim]") + table.add_row("Delay Time", f"{delay_time:,} ms") + table.add_row("Execution Time", f"{execution_time:,} ms") + + return Panel( + table, + title="[bold blue]📊 Job Metrics[/bold blue]", + border_style="blue" + ) + + +@contextmanager +def job_progress_tracker(job_id: str, endpoint_name: str) -> Generator[Optional["JobProgressTracker"], None, None]: + """Context manager for tracking job progress with Rich UI""" + if rich_ui.enabled and rich_ui.console is not None: + tracker = JobProgressTracker(job_id, endpoint_name, rich_ui.console) + try: + yield tracker + finally: + tracker.stop() + else: + yield None + + +class JobProgressTracker: + """Tracks job progress with Rich live status display""" + + def __init__(self, job_id: str, endpoint_name: str, console: "Console"): + self.job_id = job_id + self.endpoint_name = endpoint_name + self.console = console + self.start_time = time.time() + self.last_status = "UNKNOWN" + self.attempt_count = 0 + self.current_status_display = None + self.status_printed = False + + def update_status(self, status: str, message: str = "") -> None: + """Update the job status display""" + if not rich_ui.enabled or not RICH_AVAILABLE: + return + + elapsed = int(time.time() - self.start_time) + + if status != self.last_status: + # Clean up previous status display + if self.current_status_display: + self.current_status_display.stop() + self.current_status_display = None + + self.attempt_count = 0 + self.last_status = status + self.status_printed = False + + # Handle different status types + if status in ["IN_QUEUE", "INITIALIZING", "RUNNING"]: + # Choose appropriate emoji and spinner based on status + if status == "IN_QUEUE": + emoji = "⏳" + spinner = "simpleDotsScrolling" + elif status == "INITIALIZING": + emoji = "⚡" + spinner = "dots12" + else: # RUNNING + emoji = "🏁" + spinner = "dots" + + status_text = f"[{get_status_color(status)}]{status}[/{get_status_color(status)}]" + full_message = f"{emoji} {status_text}" + if message: + full_message += f" {message}" + + # Create live status display + self.current_status_display = Status( + full_message, + spinner=spinner, + console=self.console + ) + self.current_status_display.start() + else: + # Final status - print and finish + color = get_status_color(status) + self.console.print( + f"[{color}]●[/{color}] Job {self.job_id}: {status} ({elapsed}s)" + ) + else: + self.attempt_count += 1 + + def show_progress_indicator(self) -> None: + """Update the live status with progress indication""" + if not rich_ui.enabled or not self.current_status_display: + return + + # For live status, the spinner handles the animation automatically + # We can optionally update the message to show progress + if self.attempt_count > 0 and self.attempt_count % 5 == 0: # Every 5th attempt + elapsed = int(time.time() - self.start_time) + status_text = f"[{get_status_color(self.last_status)}]{self.last_status}[/{get_status_color(self.last_status)}]" + + if self.last_status == "IN_QUEUE": + emoji = "🕑" + message = f"Waiting for worker... ({elapsed}s)" + elif self.last_status == "INITIALIZING": + emoji = "⚡" + message = f"Starting up worker... ({elapsed}s)" + else: # RUNNING + emoji = "⚙️" + message = f"Executing function... ({elapsed}s)" + + full_message = f"{emoji} {status_text} {message}" + self.current_status_display.update(full_message) + + def stop(self) -> None: + """Stop the status display""" + if self.current_status_display: + self.current_status_display.stop() + self.current_status_display = None + + +def print_with_rich(message: Any, style: str = "") -> None: + """Print message with Rich styling if available""" + if rich_ui.enabled and rich_ui.console: + rich_ui.console.print(message, style=style) + else: + # Strip Rich markup for plain output if it's a string + if isinstance(message, str): + import re + clean_message = re.sub(r'\[/?[^\]]*\]', '', message) + print(clean_message) + else: + print(message) + + +def create_health_display(health_data: Dict[str, Any]) -> Union[str, "Panel"]: + """Create health status display""" + if not rich_ui.enabled or not RICH_AVAILABLE: + return f"Health: {health_data}" + + table = Table(show_header=True, header_style="bold cyan") + table.add_column("Status", style="bold") + table.add_column("Count", justify="right", style="green") + + workers = health_data.get("workers", {}) + for status, count in workers.items(): + if count > 0: + color = get_status_color(status) + table.add_row(f"[{color}]{status.title()}[/{color}]", str(count)) + + return Panel( + table, + title="[bold yellow]🏥 Endpoint Health[/bold yellow]", + border_style="yellow" + ) + + +def create_user_output_panel(output_lines: List[str], source: str = "Local") -> Union[str, "Panel"]: + """Create a panel for user print() output""" + if not rich_ui.enabled or not RICH_AVAILABLE or not output_lines: + return "" + + # Filter out empty lines and format content + content_lines = [line for line in output_lines if line.strip()] + + if not content_lines: + return "" + + # Create the content text + content = '\n'.join(content_lines) + + # Choose icon and color based on source + if source.lower() == "remote": + icon = "🔧" + border_color = "blue" + title_color = "bold blue" + else: + icon = "💬" + border_color = "green" + title_color = "bold green" + + return Panel( + content, + title=f"[{title_color}]{icon} {source} Output[/{title_color}]", + border_style=border_color, + padding=(0, 1) + ) + + +def display_remote_output(stdout_lines: List[str]) -> None: + """Display remote function output in a Rich panel""" + if not rich_ui.enabled or not stdout_lines: + return + + panel = create_user_output_panel(stdout_lines, "Remote") + if panel: + print_with_rich(panel) + + +@contextmanager +def capture_local_prints() -> Generator[None, None, None]: + """Context manager to capture and display local print() calls""" + if not rich_ui.enabled: + yield + return + + rich_ui.start_print_capture() + try: + yield + finally: + captured = rich_ui.stop_print_capture() + if captured: + panel = create_user_output_panel(captured, "Local") + if panel: + print_with_rich(panel) \ No newline at end of file diff --git a/src/tetra_rp/logger.py b/src/tetra_rp/logger.py index 1fc86a3d..4bdcc3f5 100644 --- a/src/tetra_rp/logger.py +++ b/src/tetra_rp/logger.py @@ -4,28 +4,45 @@ import sys from typing import Union, Optional +# Import Rich UI components with fallback +try: + from .core.utils.rich_ui import get_rich_handler, is_rich_enabled + RICH_UI_AVAILABLE = True +except ImportError: + RICH_UI_AVAILABLE = False + def setup_logging( level: Union[int, str] = logging.INFO, stream=sys.stdout, fmt: Optional[str] = None ): """ Sets up the root logger with a stream handler and basic formatting. + Uses Rich handler if available and enabled, otherwise falls back to standard logging. Does nothing if handlers are already configured. """ if isinstance(level, str): level = getattr(logging, level.upper(), logging.INFO) - if fmt is None: - if level == logging.DEBUG: - fmt = "%(asctime)s | %(levelname)-5s | %(name)s | %(filename)s:%(lineno)d | %(message)s" - else: - # Default format for INFO level and above - fmt = "%(asctime)s | %(levelname)-5s | %(message)s" - root_logger = logging.getLogger() if not root_logger.hasHandlers(): - handler = logging.StreamHandler(stream) - handler.setFormatter(logging.Formatter(fmt)) + # Use Rich handler if available and enabled + if RICH_UI_AVAILABLE and is_rich_enabled(): + handler = get_rich_handler() + # When Rich UI is enabled, reduce log verbosity to focus on Rich output + if level <= logging.INFO: + level = logging.WARNING # Only show warnings and errors + else: + # Fallback to standard handler + if fmt is None: + if level == logging.DEBUG: + fmt = "%(asctime)s | %(levelname)-5s | %(name)s | %(filename)s:%(lineno)d | %(message)s" + else: + # Default format for INFO level and above + fmt = "%(asctime)s | %(levelname)-5s | %(message)s" + + handler = logging.StreamHandler(stream) + handler.setFormatter(logging.Formatter(fmt)) + root_logger.setLevel(level) root_logger.addHandler(handler) @@ -33,6 +50,19 @@ def setup_logging( env_level = os.environ.get("LOG_LEVEL") if env_level: root_logger.setLevel(env_level.upper()) + elif RICH_UI_AVAILABLE and is_rich_enabled(): + # Suppress routine logs when Rich UI is active, unless explicitly overridden + if not os.environ.get("LOG_LEVEL"): + root_logger.setLevel(logging.WARNING) + # Also suppress specific noisy loggers + for logger_name in ["tetra_rp", "serverless", "resource_manager", "LiveServerlessStub", "asyncio"]: + specific_logger = logging.getLogger(logger_name) + specific_logger.setLevel(logging.WARNING) + + # Add global filter to catch any remaining debug messages + from .core.utils.rich_ui import RichLoggingFilter + global_filter = RichLoggingFilter() + root_logger.addFilter(global_filter) def get_logger(name: Optional[str] = None) -> logging.Logger: diff --git a/src/tetra_rp/stubs/live_serverless.py b/src/tetra_rp/stubs/live_serverless.py index 3261c981..9f3f7a42 100644 --- a/src/tetra_rp/stubs/live_serverless.py +++ b/src/tetra_rp/stubs/live_serverless.py @@ -6,6 +6,7 @@ import traceback import cloudpickle from tetra_rp import get_logger +from tetra_rp.core.utils.rich_ui import display_remote_output, is_rich_enabled from ..core.resources import LiveServerless from ..protos.remote_execution import ( FunctionRequest, @@ -94,8 +95,14 @@ def handle_response(self, response: FunctionResponse): raise ValueError("Invalid response from server") if response.stdout: - for line in response.stdout.splitlines(): - log.info(f"Remote | {line}") + stdout_lines = response.stdout.splitlines() + if is_rich_enabled(): + # Display remote output in Rich panel + display_remote_output(stdout_lines) + else: + # Fallback to standard logging + for line in stdout_lines: + log.info(f"Remote | {line}") if response.success: if response.result is None: diff --git a/uv.lock b/uv.lock index 59d81f83..181feec8 100644 --- a/uv.lock +++ b/uv.lock @@ -1922,9 +1922,15 @@ source = { editable = "." } dependencies = [ { name = "cloudpickle" }, { name = "python-dotenv" }, + { name = "rich" }, { name = "runpod" }, ] +[package.optional-dependencies] +rich = [ + { name = "rich" }, +] + [package.dev-dependencies] dev = [ { name = "ruff" }, @@ -1934,8 +1940,11 @@ dev = [ requires-dist = [ { name = "cloudpickle", specifier = ">=3.1.1" }, { name = "python-dotenv", specifier = ">=1.0.0" }, + { name = "rich", specifier = ">=14.0.0" }, + { name = "rich", marker = "extra == 'rich'", specifier = ">=13.0.0" }, { name = "runpod", specifier = "~=1.7.9" }, ] +provides-extras = ["rich"] [package.metadata.requires-dev] dev = [{ name = "ruff", specifier = ">=0.11.9" }] From b9555995b7193f7110f0bc61850071412577e12e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Thu, 26 Jun 2025 12:51:43 -0700 Subject: [PATCH 2/7] cleanup --- src/tetra_rp/client.py | 1 - src/tetra_rp/core/resources/resource_manager.py | 5 ++--- src/tetra_rp/logger.py | 16 ---------------- src/tetra_rp/stubs/live_serverless.py | 2 +- 4 files changed, 3 insertions(+), 21 deletions(-) diff --git a/src/tetra_rp/client.py b/src/tetra_rp/client.py index 641f4c02..ddce7d83 100644 --- a/src/tetra_rp/client.py +++ b/src/tetra_rp/client.py @@ -1,6 +1,5 @@ import logging from functools import wraps -from tetra_rp import get_logger from tetra_rp.core.utils.rich_ui import capture_local_prints from typing import List from .core.resources import ServerlessResource, ResourceManager diff --git a/src/tetra_rp/core/resources/resource_manager.py b/src/tetra_rp/core/resources/resource_manager.py index 19667ffa..f8a54224 100644 --- a/src/tetra_rp/core/resources/resource_manager.py +++ b/src/tetra_rp/core/resources/resource_manager.py @@ -3,9 +3,8 @@ from typing import Dict from pathlib import Path -from tetra_rp import get_logger -from tetra_rp.core.utils.singleton import SingletonMixin -from tetra_rp.core.utils.rich_ui import is_rich_enabled, create_reused_resource_panel, print_with_rich +from ..utils.singleton import SingletonMixin +from ..utils.rich_ui import is_rich_enabled, create_reused_resource_panel, print_with_rich from .base import DeployableResource diff --git a/src/tetra_rp/logger.py b/src/tetra_rp/logger.py index 5283b23f..4cc40c38 100644 --- a/src/tetra_rp/logger.py +++ b/src/tetra_rp/logger.py @@ -62,19 +62,3 @@ def setup_logging( from .core.utils.rich_ui import RichLoggingFilter global_filter = RichLoggingFilter() root_logger.addFilter(global_filter) - - -def get_logger(name: Optional[str] = None) -> logging.Logger: - """ - Returns a logger. If no name is provided, it infers the caller's module name. - """ - if name is None: - # Get the caller's module name. - frame = inspect.stack()[1] - module = inspect.getmodule(frame[0]) - name = module.__name__ if module else "__main__" - - return logging.getLogger(name) - - -setup_logging(os.environ.get("LOG_LEVEL", "INFO")) diff --git a/src/tetra_rp/stubs/live_serverless.py b/src/tetra_rp/stubs/live_serverless.py index d7e494dc..f1f9af0a 100644 --- a/src/tetra_rp/stubs/live_serverless.py +++ b/src/tetra_rp/stubs/live_serverless.py @@ -5,7 +5,7 @@ import hashlib import traceback import cloudpickle -from tetra_rp import get_logger +import logging from tetra_rp.core.utils.rich_ui import display_remote_output, is_rich_enabled from ..core.resources import LiveServerless from ..protos.remote_execution import ( From ac98c046256481f6032bb6b375cbd36debbe6c0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Thu, 26 Jun 2025 12:53:33 -0700 Subject: [PATCH 3/7] cleanup --- src/tetra_rp/core/resources/serverless.py | 46 +++++++++++------------ 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/src/tetra_rp/core/resources/serverless.py b/src/tetra_rp/core/resources/serverless.py index f27f0123..b3fb156b 100644 --- a/src/tetra_rp/core/resources/serverless.py +++ b/src/tetra_rp/core/resources/serverless.py @@ -3,16 +3,12 @@ import os from typing import Any, Dict, List, Optional from enum import Enum -from urllib.parse import urljoin from pydantic import field_serializer, field_validator, model_validator, BaseModel, Field -from tetra_rp import get_logger -from tetra_rp.core.utils.backoff import get_backoff_delay from tetra_rp.core.utils.rich_ui import ( job_progress_tracker, create_deployment_panel, create_metrics_table, - format_status_text, print_with_rich, is_rich_enabled ) @@ -216,7 +212,7 @@ async def deploy(self) -> "DeployableResource": if endpoint := self.__class__(**result): if is_rich_enabled(): - panel = create_deployment_panel(endpoint.name, endpoint.id, endpoint.console_url) + panel = create_deployment_panel(endpoint.name, endpoint.id, endpoint.url) print_with_rich(panel) else: log.info(f"Deployed: {endpoint}") @@ -339,7 +335,7 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput": # Use Rich progress tracker if available with job_progress_tracker(job.job_id, self.name) as tracker: if not is_rich_enabled(): - log.info(f"{log_group} | Started {log_subgroup}") + log.info(f"{self} | Started {log_subgroup}") elif tracker: # Initialize the progress tracker with starting status tracker.update_status("IN_QUEUE", "Job submitted, waiting for worker...") @@ -353,25 +349,25 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput": while True: await asyncio.sleep(current_pace) - if await self.is_ready_for_requests(): - # Check job status - job_status = await asyncio.to_thread(job.status) - - if last_status == job_status: - # nothing changed, increase the gap - attempt += 1 - indicator = "." * (attempt // 2) if attempt % 2 == 0 else "" - if indicator: - log.info(f"{log_subgroup} | {indicator}") - else: - # status changed, reset the gap - log.info(f"{log_subgroup} | Status: {job_status}") - attempt = 0 - - last_status = job_status - - # Adjust polling pace appropriately - current_pace = get_backoff_delay(attempt) + if await self.is_ready_for_requests(): + # Check job status + job_status = await asyncio.to_thread(job.status) + + if last_status == job_status: + # nothing changed, increase the gap + attempt += 1 + indicator = "." * (attempt // 2) if attempt % 2 == 0 else "" + if indicator: + log.info(f"{log_subgroup} | {indicator}") + else: + # status changed, reset the gap + log.info(f"{log_subgroup} | Status: {job_status}") + attempt = 0 + + last_status = job_status + + # Adjust polling pace appropriately + current_pace = get_backoff_delay(attempt) if job_status in ("COMPLETED", "FAILED", "CANCELLED"): if tracker: From f7aa196283c90e25e613e9179a92225964cdc7c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Thu, 26 Jun 2025 13:59:24 -0700 Subject: [PATCH 4/7] chore: more UI refinements --- src/tetra_rp/core/api/runpod.py | 17 +++++-- .../core/resources/resource_manager.py | 11 +++-- src/tetra_rp/core/resources/serverless.py | 29 +++++++----- src/tetra_rp/core/utils/rich_ui.py | 46 +++++++++++++++++-- 4 files changed, 82 insertions(+), 21 deletions(-) diff --git a/src/tetra_rp/core/api/runpod.py b/src/tetra_rp/core/api/runpod.py index a4138099..82151a4a 100644 --- a/src/tetra_rp/core/api/runpod.py +++ b/src/tetra_rp/core/api/runpod.py @@ -8,9 +8,12 @@ import aiohttp from typing import Dict, Any, Optional import logging +from ..utils.rich_ui import format_endpoint_created + log = logging.getLogger(__name__) + RUNPOD_API_BASE_URL = os.environ.get("RUNPOD_API_BASE_URL", "https://api.runpod.io") @@ -122,9 +125,17 @@ async def create_endpoint(self, input_data: Dict[str, Any]) -> Dict[str, Any]: raise Exception("Unexpected GraphQL response structure") endpoint_data = result["saveEndpoint"] - log.info( - f"Created endpoint: {endpoint_data.get('id', 'unknown')} - {endpoint_data.get('name', 'unnamed')}" - ) + + # Use Rich formatting if available + try: + format_endpoint_created( + endpoint_data.get('id', 'unknown'), + endpoint_data.get('name', 'unnamed') + ) + except ImportError: + log.info( + f"Created endpoint: {endpoint_data.get('id', 'unknown')} - {endpoint_data.get('name', 'unnamed')}" + ) return endpoint_data diff --git a/src/tetra_rp/core/resources/resource_manager.py b/src/tetra_rp/core/resources/resource_manager.py index f8a54224..19d6d1b5 100644 --- a/src/tetra_rp/core/resources/resource_manager.py +++ b/src/tetra_rp/core/resources/resource_manager.py @@ -4,7 +4,12 @@ from pathlib import Path from ..utils.singleton import SingletonMixin -from ..utils.rich_ui import is_rich_enabled, create_reused_resource_panel, print_with_rich +from ..utils.rich_ui import ( + create_reused_resource_panel, + format_console_url, + is_rich_enabled, + print_with_rich, +) from .base import DeployableResource @@ -73,14 +78,14 @@ async def get_or_deploy_resource( if is_rich_enabled(): # Show a panel for reused resources similar to fresh deployments - panel = create_reused_resource_panel(existing.name, existing.id, existing.console_url) + panel = create_reused_resource_panel(existing.name, existing.id, existing.url) print_with_rich(panel) else: log.debug(f"{existing} exists, reusing.") return existing if deployed_resource := await config.deploy(): - log.info(f"URL: {deployed_resource.url}") + format_console_url(deployed_resource.url) self.add_resource(uid, deployed_resource) return deployed_resource diff --git a/src/tetra_rp/core/resources/serverless.py b/src/tetra_rp/core/resources/serverless.py index b3fb156b..4fcdaec7 100644 --- a/src/tetra_rp/core/resources/serverless.py +++ b/src/tetra_rp/core/resources/serverless.py @@ -23,6 +23,8 @@ from .gpu import GpuGroup from .cpu import CpuInstanceType from .environment import EnvironmentVars +from ..utils.rich_ui import rich_ui, format_api_info, format_job_status + # Environment variables are loaded from the .env file def get_env_vars() -> Dict[str, str]: @@ -143,7 +145,7 @@ def validate_gpus(cls, value: List[GpuGroup]) -> List[GpuGroup]: @model_validator(mode="after") def sync_input_fields(self): """Sync between temporary inputs and exported fields""" - if self.flashboot: + if self.flashboot and not self.name.endswith("-fb"): self.name += "-fb" if self.instanceIds: @@ -212,7 +214,7 @@ async def deploy(self) -> "DeployableResource": if endpoint := self.__class__(**result): if is_rich_enabled(): - panel = create_deployment_panel(endpoint.name, endpoint.id, endpoint.url) + panel = create_deployment_panel(endpoint.name, endpoint.id or "", endpoint.url) print_with_rich(panel) else: log.info(f"Deployed: {endpoint}") @@ -261,9 +263,10 @@ async def is_ready_for_requests(self, give_up_threshold=10) -> bool: else: # nothing changed, increase the gap attempt += 1 - indicator = "." * (attempt // 2) if attempt % 2 == 0 else "" - if indicator: - log.info(f"{self} | {indicator}") + if not rich_ui.enabled: + indicator = "." * (attempt // 2) if attempt % 2 == 0 else "" + if indicator: + log.info(f"{self} | {indicator}") status = health.workers.status if status in [ @@ -299,7 +302,7 @@ def _fetch_job(): # Poll until requests can be sent await self.is_ready_for_requests() - log.info(f"{self} | API /run_sync") + format_api_info(str(self), self.id or "", "/run_sync") response = await asyncio.to_thread(_fetch_job) return JobOutput(**response) @@ -327,7 +330,7 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput": await self.is_ready_for_requests() # Create a job using the endpoint - log.info(f"{self} | API /run") + format_api_info(str(self), self.id or "", "/run") job = await asyncio.to_thread(self.endpoint.run, request_input=payload) log_subgroup = f"Job:{job.job_id}" @@ -356,12 +359,16 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput": if last_status == job_status: # nothing changed, increase the gap attempt += 1 - indicator = "." * (attempt // 2) if attempt % 2 == 0 else "" - if indicator: - log.info(f"{log_subgroup} | {indicator}") + if tracker: + tracker.show_progress_indicator() + else: + if not rich_ui.enabled: + indicator = "." * (attempt // 2) if attempt % 2 == 0 else "" + if indicator: + log.info(f"{log_subgroup} | {indicator}") else: # status changed, reset the gap - log.info(f"{log_subgroup} | Status: {job_status}") + format_job_status(job.job_id, job_status) attempt = 0 last_status = job_status diff --git a/src/tetra_rp/core/utils/rich_ui.py b/src/tetra_rp/core/utils/rich_ui.py index 0ea68fc4..c35ea7d9 100644 --- a/src/tetra_rp/core/utils/rich_ui.py +++ b/src/tetra_rp/core/utils/rich_ui.py @@ -21,8 +21,11 @@ except ImportError: RICH_AVAILABLE = False # Create dummy classes for type hints when Rich is not available - class Console: pass - class Status: pass + class Console: + pass + + class Status: + pass class TetraStatus(str, Enum): @@ -42,7 +45,8 @@ def is_rich_enabled() -> bool: """Check if Rich UI should be enabled based on environment and availability""" return ( RICH_AVAILABLE and - os.environ.get("TETRA_RICH_UI", "false").lower() in ("true", "1", "yes") + os.environ.get("LOG_LEVEL", "INFO").upper() == "INFO" + # os.environ.get("TETRA_RICH_UI", "false").lower() in ("true", "1", "yes") ) @@ -439,4 +443,38 @@ def capture_local_prints() -> Generator[None, None, None]: if captured: panel = create_user_output_panel(captured, "Local") if panel: - print_with_rich(panel) \ No newline at end of file + print_with_rich(panel) + + +def format_api_info(endpoint_name: str, endpoint_id: str, api_path: str) -> None: + """Display API endpoint info in Rich format""" + if rich_ui.enabled and rich_ui.console: + rich_ui.console.print(f"🔗 [cyan]{endpoint_name}[/cyan]:[dim]{endpoint_id}[/dim] [green]→[/green] [bold yellow]{api_path}[/bold yellow]") + else: + print(f"{endpoint_name}:{endpoint_id} | API {api_path}") + + +def format_job_status(job_id: str, status: str) -> None: + """Display job status in Rich format""" + if rich_ui.enabled and rich_ui.console: + color = get_status_color(status) + short_job_id = job_id.split('-')[0] if '-' in job_id else job_id[:8] + rich_ui.console.print(f"⚡ Job [dim]{short_job_id}[/dim]: [{color}]{status}[/{color}]") + else: + print(f"Job:{job_id} | Status: {status}") + + +def format_console_url(url: str) -> None: + """Display console URL in Rich format""" + if rich_ui.enabled and rich_ui.console: + rich_ui.console.print(f"🌐 [link={url}]Console Dashboard[/link]") + else: + print(f"URL: {url}") + + +def format_endpoint_created(endpoint_id: str, endpoint_name: str) -> None: + """Display endpoint creation in Rich format""" + if rich_ui.enabled and rich_ui.console: + rich_ui.console.print(f"✨ Created endpoint [cyan]{endpoint_name}[/cyan] [dim]({endpoint_id})[/dim]") + else: + print(f"Created endpoint: {endpoint_id} - {endpoint_name}") \ No newline at end of file From c85e963e6d01a3dbdf9f87e2b6d2a57fca4ec144 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Thu, 26 Jun 2025 14:00:10 -0700 Subject: [PATCH 5/7] chore: cleanup --- src/tetra_rp/client.py | 1 - src/tetra_rp/core/resources/environment.py | 1 - src/tetra_rp/protos/remote_execution.py | 2 +- tetra-examples | 2 +- uv.lock | 2 +- 5 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/tetra_rp/client.py b/src/tetra_rp/client.py index ddce7d83..09fd8d7e 100644 --- a/src/tetra_rp/client.py +++ b/src/tetra_rp/client.py @@ -1,6 +1,5 @@ import logging from functools import wraps -from tetra_rp.core.utils.rich_ui import capture_local_prints from typing import List from .core.resources import ServerlessResource, ResourceManager from .stubs import stub_resource diff --git a/src/tetra_rp/core/resources/environment.py b/src/tetra_rp/core/resources/environment.py index 0301aab4..6b81e15f 100644 --- a/src/tetra_rp/core/resources/environment.py +++ b/src/tetra_rp/core/resources/environment.py @@ -1,6 +1,5 @@ from typing import Dict, Optional from dotenv import dotenv_values -import os class EnvironmentVars: def __init__(self): diff --git a/src/tetra_rp/protos/remote_execution.py b/src/tetra_rp/protos/remote_execution.py index 2c16941f..ab988e2f 100644 --- a/src/tetra_rp/protos/remote_execution.py +++ b/src/tetra_rp/protos/remote_execution.py @@ -1,7 +1,7 @@ # TODO: generate using betterproto from abc import ABC, abstractmethod -from typing import List, Dict, Optional, Union +from typing import List, Dict, Optional from pydantic import BaseModel, Field diff --git a/tetra-examples b/tetra-examples index 718267c1..e804315e 160000 --- a/tetra-examples +++ b/tetra-examples @@ -1 +1 @@ -Subproject commit 718267c156e74c070ec704f7dc641d34d0752da8 +Subproject commit e804315ee8712c73559086ff394b63a71be144d4 diff --git a/uv.lock b/uv.lock index a3ddf539..cdd27485 100644 --- a/uv.lock +++ b/uv.lock @@ -1962,7 +1962,7 @@ wheels = [ [[package]] name = "tetra-rp" -version = "0.3.0" +version = "0.4.2" source = { editable = "." } dependencies = [ { name = "cloudpickle" }, From c108683d15186f0d41c83b0c8512874da5bf1ab8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Tue, 8 Jul 2025 09:20:17 -0700 Subject: [PATCH 6/7] fix: ruff cleanup --- Makefile | 6 + src/tetra_rp/__init__.py | 2 +- src/tetra_rp/core/api/runpod.py | 5 +- .../core/resources/resource_manager.py | 4 +- src/tetra_rp/core/resources/serverless.py | 31 ++- src/tetra_rp/core/utils/rich_ui.py | 202 ++++++++++-------- src/tetra_rp/logger.py | 16 +- 7 files changed, 158 insertions(+), 108 deletions(-) diff --git a/Makefile b/Makefile index ee7bb8eb..9d397b30 100644 --- a/Makefile +++ b/Makefile @@ -24,3 +24,9 @@ clean: build: clean dev uv build + +check: dev + uv run ruff check src/ tests/ || true + uv run ruff format --check src/ tests/ || true + uv run mypy src/ --ignore-missing-imports || true + uv run pytest tests/ -vv || true diff --git a/src/tetra_rp/__init__.py b/src/tetra_rp/__init__.py index 98627520..aed3638a 100644 --- a/src/tetra_rp/__init__.py +++ b/src/tetra_rp/__init__.py @@ -21,7 +21,7 @@ ServerlessEndpoint, runpod, ) -from .core.utils.rich_ui import capture_local_prints +from .core.utils.rich_ui import capture_local_prints # noqa: E402 __all__ = [ diff --git a/src/tetra_rp/core/api/runpod.py b/src/tetra_rp/core/api/runpod.py index 152df4c2..0efeb52d 100644 --- a/src/tetra_rp/core/api/runpod.py +++ b/src/tetra_rp/core/api/runpod.py @@ -127,12 +127,11 @@ async def create_endpoint(self, input_data: Dict[str, Any]) -> Dict[str, Any]: raise Exception("Unexpected GraphQL response structure") endpoint_data = result["saveEndpoint"] - + # Use Rich formatting if available try: format_endpoint_created( - endpoint_data.get('id', 'unknown'), - endpoint_data.get('name', 'unnamed') + endpoint_data.get("id", "unknown"), endpoint_data.get("name", "unnamed") ) except ImportError: log.info( diff --git a/src/tetra_rp/core/resources/resource_manager.py b/src/tetra_rp/core/resources/resource_manager.py index 19d6d1b5..34e526bf 100644 --- a/src/tetra_rp/core/resources/resource_manager.py +++ b/src/tetra_rp/core/resources/resource_manager.py @@ -78,7 +78,9 @@ async def get_or_deploy_resource( if is_rich_enabled(): # Show a panel for reused resources similar to fresh deployments - panel = create_reused_resource_panel(existing.name, existing.id, existing.url) + panel = create_reused_resource_panel( + existing.name, existing.id, existing.url + ) print_with_rich(panel) else: log.debug(f"{existing} exists, reusing.") diff --git a/src/tetra_rp/core/resources/serverless.py b/src/tetra_rp/core/resources/serverless.py index fd5a181e..54219a04 100644 --- a/src/tetra_rp/core/resources/serverless.py +++ b/src/tetra_rp/core/resources/serverless.py @@ -3,14 +3,20 @@ import os from typing import Any, Dict, List, Optional from enum import Enum -from pydantic import field_serializer, field_validator, model_validator, BaseModel, Field +from pydantic import ( + field_serializer, + field_validator, + model_validator, + BaseModel, + Field, +) from tetra_rp.core.utils.rich_ui import ( - job_progress_tracker, - create_deployment_panel, + job_progress_tracker, + create_deployment_panel, create_metrics_table, print_with_rich, - is_rich_enabled + is_rich_enabled, ) from runpod.endpoint.runner import Job @@ -26,7 +32,6 @@ from ..utils.rich_ui import rich_ui, format_api_info, format_job_status - # Environment variables are loaded from the .env file def get_env_vars() -> Dict[str, str]: """ @@ -215,7 +220,9 @@ async def deploy(self) -> "DeployableResource": if endpoint := self.__class__(**result): if is_rich_enabled(): - panel = create_deployment_panel(endpoint.name, endpoint.id or "", endpoint.url) + panel = create_deployment_panel( + endpoint.name, endpoint.id or "", endpoint.url + ) print_with_rich(panel) else: log.info(f"Deployed: {endpoint}") @@ -342,7 +349,9 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput": log.info(f"{self} | Started {log_subgroup}") elif tracker: # Initialize the progress tracker with starting status - tracker.update_status("IN_QUEUE", "Job submitted, waiting for worker...") + tracker.update_status( + "IN_QUEUE", "Job submitted, waiting for worker..." + ) current_pace = 0 attempt = 0 @@ -364,7 +373,9 @@ async def run(self, payload: Dict[str, Any]) -> "JobOutput": tracker.show_progress_indicator() else: if not rich_ui.enabled: - indicator = "." * (attempt // 2) if attempt % 2 == 0 else "" + indicator = ( + "." * (attempt // 2) if attempt % 2 == 0 else "" + ) if indicator: log.info(f"{log_subgroup} | {indicator}") else: @@ -443,7 +454,9 @@ class JobOutput(BaseModel): def model_post_init(self, __context): if is_rich_enabled(): - metrics_table = create_metrics_table(self.delayTime, self.executionTime, self.workerId) + metrics_table = create_metrics_table( + self.delayTime, self.executionTime, self.workerId + ) print_with_rich(metrics_table) else: log_group = f"Worker:{self.workerId}" diff --git a/src/tetra_rp/core/utils/rich_ui.py b/src/tetra_rp/core/utils/rich_ui.py index c35ea7d9..d7729cc3 100644 --- a/src/tetra_rp/core/utils/rich_ui.py +++ b/src/tetra_rp/core/utils/rich_ui.py @@ -17,21 +17,24 @@ from rich.panel import Panel from rich.table import Table from rich.status import Status + RICH_AVAILABLE = True except ImportError: RICH_AVAILABLE = False + # Create dummy classes for type hints when Rich is not available class Console: pass - + class Status: pass class TetraStatus(str, Enum): """Status types for visual styling""" + READY = "READY" - INITIALIZING = "INITIALIZING" + INITIALIZING = "INITIALIZING" RUNNING = "RUNNING" COMPLETED = "COMPLETED" FAILED = "FAILED" @@ -44,64 +47,63 @@ class TetraStatus(str, Enum): def is_rich_enabled() -> bool: """Check if Rich UI should be enabled based on environment and availability""" return ( - RICH_AVAILABLE and - os.environ.get("LOG_LEVEL", "INFO").upper() == "INFO" + RICH_AVAILABLE and os.environ.get("LOG_LEVEL", "INFO").upper() == "INFO" # os.environ.get("TETRA_RICH_UI", "false").lower() in ("true", "1", "yes") ) class RichUIManager: """Central manager for Rich UI components""" - + def __init__(self): self.console = Console() if RICH_AVAILABLE else None self._enabled = is_rich_enabled() self._captured_prints: List[str] = [] self._original_print = None self._print_capturing = False - + @property def enabled(self) -> bool: return self._enabled and self.console is not None - + def get_console(self) -> Optional["Console"]: return self.console if self.enabled else None - + def start_print_capture(self) -> None: """Start capturing print() calls""" if not self.enabled or self._print_capturing: return - + self._original_print = builtins.print self._captured_prints.clear() self._print_capturing = True - + def captured_print(*args, **kwargs): # Convert print arguments to string - output = ' '.join(str(arg) for arg in args) - + output = " ".join(str(arg) for arg in args) + # Handle common print kwargs - end = kwargs.get('end', '\n') - sep = kwargs.get('sep', ' ') + end = kwargs.get("end", "\n") + sep = kwargs.get("sep", " ") if len(args) > 1: output = sep.join(str(arg) for arg in args) - + self._captured_prints.append(output + end.rstrip()) - + # Also send to original print for fallback scenarios if not self.enabled: self._original_print(*args, **kwargs) - + builtins.print = captured_print - + def stop_print_capture(self) -> List[str]: """Stop capturing and return captured prints""" if not self._print_capturing: return [] - + if self._original_print: builtins.print = self._original_print - + self._print_capturing = False captured = self._captured_prints.copy() self._captured_prints.clear() @@ -114,13 +116,18 @@ def stop_print_capture(self) -> List[str]: class RichLoggingFilter(logging.Filter): """Filter to suppress verbose third-party logs when Rich UI is active""" - + def filter(self, record): # Suppress asyncio selector debug messages - if record.name == "asyncio" and ("selector" in record.getMessage().lower() or "using selector" in record.getMessage().lower()): + if record.name == "asyncio" and ( + "selector" in record.getMessage().lower() + or "using selector" in record.getMessage().lower() + ): return False - # Suppress other verbose third-party logs - if record.levelno <= logging.INFO and record.name.startswith(("urllib3", "requests", "boto3", "botocore")): + # Suppress other verbose third-party logs + if record.levelno <= logging.INFO and record.name.startswith( + ("urllib3", "requests", "boto3", "botocore") + ): return False # Suppress all DEBUG level logs when Rich UI is active (except errors/warnings) if record.levelno <= logging.DEBUG: @@ -136,7 +143,7 @@ def get_rich_handler() -> logging.Handler: show_time=True, show_path=False, markup=True, - rich_tracebacks=True + rich_tracebacks=True, ) # Add filter to suppress verbose logs when Rich UI is active handler.addFilter(RichLoggingFilter()) @@ -144,9 +151,9 @@ def get_rich_handler() -> logging.Handler: else: # Fallback to standard handler handler = logging.StreamHandler() - handler.setFormatter(logging.Formatter( - "%(asctime)s | %(levelname)-5s | %(message)s" - )) + handler.setFormatter( + logging.Formatter("%(asctime)s | %(levelname)-5s | %(message)s") + ) return handler @@ -154,18 +161,18 @@ def get_status_color(status: str) -> str: """Get Rich color for status""" if not rich_ui.enabled: return "" - + color_map = { "READY": "green", "COMPLETED": "green", "RUNNING": "blue", "IN_QUEUE": "cyan", - "INITIALIZING": "yellow", + "INITIALIZING": "yellow", "FAILED": "red", "CANCELLED": "red", "THROTTLED": "orange3", "UNHEALTHY": "red", - "UNKNOWN": "dim" + "UNKNOWN": "dim", } return color_map.get(status.upper(), "white") @@ -174,76 +181,82 @@ def format_status_text(status: str, message: str = "") -> str: """Format status text with color if Rich is enabled""" if not rich_ui.enabled: return f"{status}: {message}" if message else status - + color = get_status_color(status) formatted_status = f"[{color}]{status}[/{color}]" return f"{formatted_status}: {message}" if message else formatted_status -def create_deployment_panel(endpoint_name: str, endpoint_id: str, console_url: str) -> Union[str, "Panel"]: +def create_deployment_panel( + endpoint_name: str, endpoint_id: str, console_url: str +) -> Union[str, "Panel"]: """Create a deployment summary panel""" if not rich_ui.enabled or not RICH_AVAILABLE: return f"Deployed: {endpoint_name} ({endpoint_id})" - + table = Table.grid(padding=1) table.add_column(style="bold cyan", no_wrap=True) table.add_column() - + table.add_row("Endpoint:", f"[green]{endpoint_name}[/green]") table.add_row("ID:", f"[dim]{endpoint_id}[/dim]") table.add_row("Console:", f"[link={console_url}]{console_url}[/link]") - + panel = Panel( table, title="[bold green]🚀 Deployment Successful[/bold green]", - border_style="green" + border_style="green", ) return panel -def create_reused_resource_panel(endpoint_name: str, endpoint_id: str, console_url: str) -> Union[str, "Panel"]: +def create_reused_resource_panel( + endpoint_name: str, endpoint_id: str, console_url: str +) -> Union[str, "Panel"]: """Create a panel for reused existing resources""" if not rich_ui.enabled or not RICH_AVAILABLE: return f"Reusing: {endpoint_name} ({endpoint_id})" - + table = Table.grid(padding=1) table.add_column(style="bold cyan", no_wrap=True) table.add_column() - + table.add_row("Endpoint:", f"[blue]{endpoint_name}[/blue]") table.add_row("ID:", f"[dim]{endpoint_id}[/dim]") table.add_row("Console:", f"[link={console_url}]{console_url}[/link]") - + panel = Panel( table, title="[bold blue]♻️ Using Existing Resource[/bold blue]", - border_style="blue" + border_style="blue", ) return panel -def create_metrics_table(delay_time: int, execution_time: int, worker_id: str) -> Union[str, "Panel"]: +def create_metrics_table( + delay_time: int, execution_time: int, worker_id: str +) -> Union[str, "Panel"]: """Create a metrics display table""" if not rich_ui.enabled or not RICH_AVAILABLE: return f"Worker:{worker_id} | Delay: {delay_time}ms | Execution: {execution_time}ms" - + table = Table(show_header=True, header_style="bold magenta") table.add_column("Metric", style="cyan", no_wrap=True) table.add_column("Value", style="green") - + table.add_row("Worker ID", f"[dim]{worker_id}[/dim]") table.add_row("Delay Time", f"{delay_time:,} ms") table.add_row("Execution Time", f"{execution_time:,} ms") - + return Panel( - table, - title="[bold blue]📊 Job Metrics[/bold blue]", - border_style="blue" + table, title="[bold blue]📊 Job Metrics[/bold blue]", border_style="blue" ) -@contextmanager -def job_progress_tracker(job_id: str, endpoint_name: str) -> Generator[Optional["JobProgressTracker"], None, None]: +@contextmanager +def job_progress_tracker( + job_id: str, endpoint_name: str +) -> Generator[Optional["JobProgressTracker"], None, None]: """Context manager for tracking job progress with Rich UI""" if rich_ui.enabled and rich_ui.console is not None: tracker = JobProgressTracker(job_id, endpoint_name, rich_ui.console) @@ -257,34 +270,34 @@ def job_progress_tracker(job_id: str, endpoint_name: str) -> Generator[Optional[ class JobProgressTracker: """Tracks job progress with Rich live status display""" - + def __init__(self, job_id: str, endpoint_name: str, console: "Console"): self.job_id = job_id - self.endpoint_name = endpoint_name + self.endpoint_name = endpoint_name self.console = console self.start_time = time.time() self.last_status = "UNKNOWN" self.attempt_count = 0 self.current_status_display = None self.status_printed = False - + def update_status(self, status: str, message: str = "") -> None: """Update the job status display""" if not rich_ui.enabled or not RICH_AVAILABLE: return - + elapsed = int(time.time() - self.start_time) - + if status != self.last_status: # Clean up previous status display if self.current_status_display: self.current_status_display.stop() self.current_status_display = None - + self.attempt_count = 0 self.last_status = status self.status_printed = False - + # Handle different status types if status in ["IN_QUEUE", "INITIALIZING", "RUNNING"]: # Choose appropriate emoji and spinner based on status @@ -297,17 +310,17 @@ def update_status(self, status: str, message: str = "") -> None: else: # RUNNING emoji = "🏁" spinner = "dots" - - status_text = f"[{get_status_color(status)}]{status}[/{get_status_color(status)}]" + + status_text = ( + f"[{get_status_color(status)}]{status}[/{get_status_color(status)}]" + ) full_message = f"{emoji} {status_text}" if message: full_message += f" {message}" - + # Create live status display self.current_status_display = Status( - full_message, - spinner=spinner, - console=self.console + full_message, spinner=spinner, console=self.console ) self.current_status_display.start() else: @@ -318,18 +331,18 @@ def update_status(self, status: str, message: str = "") -> None: ) else: self.attempt_count += 1 - + def show_progress_indicator(self) -> None: """Update the live status with progress indication""" if not rich_ui.enabled or not self.current_status_display: return - + # For live status, the spinner handles the animation automatically # We can optionally update the message to show progress if self.attempt_count > 0 and self.attempt_count % 5 == 0: # Every 5th attempt elapsed = int(time.time() - self.start_time) status_text = f"[{get_status_color(self.last_status)}]{self.last_status}[/{get_status_color(self.last_status)}]" - + if self.last_status == "IN_QUEUE": emoji = "🕑" message = f"Waiting for worker... ({elapsed}s)" @@ -339,10 +352,10 @@ def show_progress_indicator(self) -> None: else: # RUNNING emoji = "⚙️" message = f"Executing function... ({elapsed}s)" - + full_message = f"{emoji} {status_text} {message}" self.current_status_display.update(full_message) - + def stop(self) -> None: """Stop the status display""" if self.current_status_display: @@ -358,7 +371,8 @@ def print_with_rich(message: Any, style: str = "") -> None: # Strip Rich markup for plain output if it's a string if isinstance(message, str): import re - clean_message = re.sub(r'\[/?[^\]]*\]', '', message) + + clean_message = re.sub(r"\[/?[^\]]*\]", "", message) print(clean_message) else: print(message) @@ -368,38 +382,40 @@ def create_health_display(health_data: Dict[str, Any]) -> Union[str, "Panel"]: """Create health status display""" if not rich_ui.enabled or not RICH_AVAILABLE: return f"Health: {health_data}" - + table = Table(show_header=True, header_style="bold cyan") table.add_column("Status", style="bold") table.add_column("Count", justify="right", style="green") - + workers = health_data.get("workers", {}) for status, count in workers.items(): if count > 0: - color = get_status_color(status) + color = get_status_color(status) table.add_row(f"[{color}]{status.title()}[/{color}]", str(count)) - + return Panel( table, title="[bold yellow]🏥 Endpoint Health[/bold yellow]", - border_style="yellow" + border_style="yellow", ) -def create_user_output_panel(output_lines: List[str], source: str = "Local") -> Union[str, "Panel"]: +def create_user_output_panel( + output_lines: List[str], source: str = "Local" +) -> Union[str, "Panel"]: """Create a panel for user print() output""" if not rich_ui.enabled or not RICH_AVAILABLE or not output_lines: return "" - + # Filter out empty lines and format content content_lines = [line for line in output_lines if line.strip()] - + if not content_lines: return "" - + # Create the content text - content = '\n'.join(content_lines) - + content = "\n".join(content_lines) + # Choose icon and color based on source if source.lower() == "remote": icon = "🔧" @@ -409,12 +425,12 @@ def create_user_output_panel(output_lines: List[str], source: str = "Local") -> icon = "💬" border_color = "green" title_color = "bold green" - + return Panel( content, title=f"[{title_color}]{icon} {source} Output[/{title_color}]", border_style=border_color, - padding=(0, 1) + padding=(0, 1), ) @@ -422,7 +438,7 @@ def display_remote_output(stdout_lines: List[str]) -> None: """Display remote function output in a Rich panel""" if not rich_ui.enabled or not stdout_lines: return - + panel = create_user_output_panel(stdout_lines, "Remote") if panel: print_with_rich(panel) @@ -434,7 +450,7 @@ def capture_local_prints() -> Generator[None, None, None]: if not rich_ui.enabled: yield return - + rich_ui.start_print_capture() try: yield @@ -449,7 +465,9 @@ def capture_local_prints() -> Generator[None, None, None]: def format_api_info(endpoint_name: str, endpoint_id: str, api_path: str) -> None: """Display API endpoint info in Rich format""" if rich_ui.enabled and rich_ui.console: - rich_ui.console.print(f"🔗 [cyan]{endpoint_name}[/cyan]:[dim]{endpoint_id}[/dim] [green]→[/green] [bold yellow]{api_path}[/bold yellow]") + rich_ui.console.print( + f"🔗 [cyan]{endpoint_name}[/cyan]:[dim]{endpoint_id}[/dim] [green]→[/green] [bold yellow]{api_path}[/bold yellow]" + ) else: print(f"{endpoint_name}:{endpoint_id} | API {api_path}") @@ -458,8 +476,10 @@ def format_job_status(job_id: str, status: str) -> None: """Display job status in Rich format""" if rich_ui.enabled and rich_ui.console: color = get_status_color(status) - short_job_id = job_id.split('-')[0] if '-' in job_id else job_id[:8] - rich_ui.console.print(f"⚡ Job [dim]{short_job_id}[/dim]: [{color}]{status}[/{color}]") + short_job_id = job_id.split("-")[0] if "-" in job_id else job_id[:8] + rich_ui.console.print( + f"⚡ Job [dim]{short_job_id}[/dim]: [{color}]{status}[/{color}]" + ) else: print(f"Job:{job_id} | Status: {status}") @@ -475,6 +495,8 @@ def format_console_url(url: str) -> None: def format_endpoint_created(endpoint_id: str, endpoint_name: str) -> None: """Display endpoint creation in Rich format""" if rich_ui.enabled and rich_ui.console: - rich_ui.console.print(f"✨ Created endpoint [cyan]{endpoint_name}[/cyan] [dim]({endpoint_id})[/dim]") + rich_ui.console.print( + f"✨ Created endpoint [cyan]{endpoint_name}[/cyan] [dim]({endpoint_id})[/dim]" + ) else: - print(f"Created endpoint: {endpoint_id} - {endpoint_name}") \ No newline at end of file + print(f"Created endpoint: {endpoint_id} - {endpoint_name}") diff --git a/src/tetra_rp/logger.py b/src/tetra_rp/logger.py index 4cc40c38..98b935a1 100644 --- a/src/tetra_rp/logger.py +++ b/src/tetra_rp/logger.py @@ -6,6 +6,7 @@ # Import Rich UI components with fallback try: from .core.utils.rich_ui import get_rich_handler, is_rich_enabled + RICH_UI_AVAILABLE = True except ImportError: RICH_UI_AVAILABLE = False @@ -38,10 +39,10 @@ def setup_logging( else: # Default format for INFO level and above fmt = "%(asctime)s | %(levelname)-5s | %(message)s" - + handler = logging.StreamHandler(stream) handler.setFormatter(logging.Formatter(fmt)) - + root_logger.setLevel(level) root_logger.addHandler(handler) @@ -54,11 +55,18 @@ def setup_logging( if not os.environ.get("LOG_LEVEL"): root_logger.setLevel(logging.WARNING) # Also suppress specific noisy loggers - for logger_name in ["tetra_rp", "serverless", "resource_manager", "LiveServerlessStub", "asyncio"]: + for logger_name in [ + "tetra_rp", + "serverless", + "resource_manager", + "LiveServerlessStub", + "asyncio", + ]: specific_logger = logging.getLogger(logger_name) specific_logger.setLevel(logging.WARNING) - + # Add global filter to catch any remaining debug messages from .core.utils.rich_ui import RichLoggingFilter + global_filter = RichLoggingFilter() root_logger.addFilter(global_filter) From b16a12885d07ecdacea5ab24fa5f9ed9d6b5c33b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dean=20Qui=C3=B1anola?= Date: Wed, 23 Jul 2025 15:06:46 -0700 Subject: [PATCH 7/7] chore: lint fix --- src/tetra_rp/core/api/runpod.py | 2 -- uv.lock | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/tetra_rp/core/api/runpod.py b/src/tetra_rp/core/api/runpod.py index 6d3e29e1..4aab0e7a 100644 --- a/src/tetra_rp/core/api/runpod.py +++ b/src/tetra_rp/core/api/runpod.py @@ -9,8 +9,6 @@ from typing import Any, Dict, Optional import aiohttp -from typing import Dict, Any, Optional -import logging from ..utils.rich_ui import format_endpoint_created diff --git a/uv.lock b/uv.lock index a206ead7..7e524771 100644 --- a/uv.lock +++ b/uv.lock @@ -2378,7 +2378,7 @@ requires-dist = [ { name = "python-dotenv", specifier = ">=1.0.0" }, { name = "rich", specifier = ">=14.0.0" }, { name = "rich", marker = "extra == 'rich'", specifier = ">=13.0.0" }, - { name = "runpod", specifier = "~=1.7.9" }, + { name = "runpod" }, ] provides-extras = ["rich"]