diff --git a/README.md b/README.md index bc067fd..5507c49 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ Obtiene información sobre el deploy: curl http://localhost:8000/ ``` -#### POST /up|down|stop +#### POST /up|down|kill|stop Controla el deploy: ```bash @@ -103,6 +103,12 @@ Lista el estado de todos los contenedores: curl http://localhost:8000/containers ``` +#### GET SSE /containers/events +Información en tiempo real sobre los cambios de estado de los contenedores (SSE): +```bash +curl -N http://localhost:8000/containers/events +``` + #### POST /containers/{name}/start|stop|restart Controla un contenedor específico: @@ -121,13 +127,6 @@ ws.onmessage = (event) => { }; ``` -#### GET /ping -Health check del servicio: - -```bash -curl http://localhost:8000/ping -``` - ## Documentación Interactiva FastAPI genera automáticamente documentación interactiva: diff --git a/data/compose.yaml b/data/compose.yaml index 938f5ff..f0e9d41 100644 --- a/data/compose.yaml +++ b/data/compose.yaml @@ -8,9 +8,16 @@ services: - mongodbdata:/data/configdb - /etc/timezone:/etc/timezone:ro - /etc/localtime:/etc/localtime:ro + healthcheck: + test: ["CMD-SHELL", "mongosh --quiet --eval 'db.getMongo().getDBNames().indexOf(\"open5gs\")' || exit 1"] + interval: 5s + timeout: 3s + start_period: 5s expose: - "27017/udp" - "27017/tcp" + ports: + - "27017:27017/tcp" networks: default: ipv4_address: ${MONGO_IP} @@ -29,6 +36,10 @@ services: - "9999/tcp" ports: - "9999:9999/tcp" + healthcheck: + test: ["CMD", "pidof", "npm run dev"] + interval: 5s + timeout: 3s networks: default: ipv4_address: ${WEBUI_IP} @@ -53,6 +64,10 @@ services: - "3868/sctp" - "5868/tcp" - "5868/sctp" + healthcheck: + test: ["CMD", "pidof", "open5gs-hssd"] + interval: 5s + timeout: 3s networks: default: ipv4_address: ${HSS_IP} @@ -73,6 +88,10 @@ services: expose: - "2123/udp" - "8805/udp" + healthcheck: + test: ["CMD", "pidof", "open5gs-sgwcd"] + interval: 5s + timeout: 3s networks: default: ipv4_address: ${SGWC_IP} @@ -96,6 +115,10 @@ services: - "2152/udp" ports: - "${SGWU_ADVERTISE_IP}:2152:2152/udp" + healthcheck: + test: ["CMD", "pidof", "open5gs-sgwud"] + interval: 5s + timeout: 3s networks: default: ipv4_address: ${SGWU_IP} @@ -132,6 +155,10 @@ services: - "2123/udp" - "7777/tcp" - "9091/tcp" + healthcheck: + test: ["CMD", "pidof", "open5gs-smfd"] + interval: 5s + timeout: 3s networks: default: ipv4_address: ${SMF_IP} @@ -165,6 +192,10 @@ services: sysctls: - net.ipv4.ip_forward=1 - net.ipv6.conf.all.disable_ipv6=0 + healthcheck: + test: ["CMD", "pidof", "open5gs-upfd"] + interval: 5s + timeout: 3s networks: default: ipv4_address: ${UPF_IP} @@ -189,7 +220,6 @@ services: - SGWC_IP=${SGWC_IP} - SMF_IP=${SMF_IP} - HSS_IP=${HSS_IP} - volumes: - /etc/timezone:/etc/timezone:ro - /etc/localtime:/etc/localtime:ro @@ -203,6 +233,10 @@ services: - "9091/tcp" ports: - "36412:36412/sctp" + healthcheck: + test: ["CMD", "pidof", "open5gs-mmed"] + interval: 5s + timeout: 3s networks: default: ipv4_address: ${MME_IP} @@ -231,6 +265,10 @@ services: ports: - "${PCRF_BIND_PORT}:${PCRF_BIND_PORT}/sctp" - "${PCRF_BIND_PORT}:${PCRF_BIND_PORT}/tcp" + healthcheck: + test: ["CMD", "pidof", "open5gs-pcrfd"] + interval: 5s + timeout: 3s networks: default: ipv4_address: ${PCRF_IP} diff --git a/main.py b/main.py index 3abe6ff..b71c969 100644 --- a/main.py +++ b/main.py @@ -26,7 +26,6 @@ app = create_app( compose_file=COMPOSE_FILE, env_file=ENV_FILE, - include_routers=True, ) if __name__ == "__main__": diff --git a/pyproject.toml b/pyproject.toml index 5c3a7b1..fc1a4e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "base-deployment-controller" -version = "0.1.0" +version = "0.2.0" description = "REST API to control the basic operations of a deployment" readme = "README.md" requires-python = ">=3.8" diff --git a/src/base_deployment_controller/__init__.py b/src/base_deployment_controller/__init__.py index e6ee606..2a50951 100644 --- a/src/base_deployment_controller/__init__.py +++ b/src/base_deployment_controller/__init__.py @@ -1,9 +1,15 @@ """Base Deployment Controller package entry point.""" +import asyncio +from contextlib import asynccontextmanager + from fastapi import FastAPI from .services.config import ConfigService +from .services.task_manager import TaskManager +from .routers.api import APIRoutes from .routers.environment import EnvRoutes from .routers.container import ContainerRoutes +from .services.status_event_manager import StatusEventManager from .routers.deployment import DeploymentRoutes from .builder import AppBuilder @@ -11,7 +17,6 @@ def create_app( compose_file: str = "compose.yaml", env_file: str = ".env", - include_routers: bool = True, title: str = "Base Deployment Controller", description: str = "REST API to control the basic operations of a deployment", version: str = "1.0.0", @@ -22,7 +27,6 @@ def create_app( Args: compose_file: Path to compose.yaml file. env_file: Path to .env file. - include_routers: If True, registers base routers (envs, containers, deployment). title: FastAPI application title. description: FastAPI application description. version: Application version string. @@ -30,27 +34,52 @@ def create_app( Returns: FastAPI app ready to use or extend. """ + config_service = ConfigService(compose_file, env_file) + task_manager = TaskManager(ttl=3600) # 1 hour TTL for completed tasks + + @asynccontextmanager + async def lifespan(app: FastAPI): + # Startup: start cleanup task + cleanup_task = asyncio.create_task(_cleanup_loop(task_manager)) + yield + # Shutdown: cancel cleanup task + cleanup_task.cancel() + try: + await cleanup_task + except asyncio.CancelledError: + pass + + async def _cleanup_loop(task_manager: TaskManager): + """Background loop for cleaning up old tasks.""" + while True: + await asyncio.sleep(300) # Run every 5 minutes + await task_manager.cleanup_old_tasks() + app = FastAPI( title=title, description=description, version=version, + lifespan=lifespan, ) + + api_routes = APIRoutes() + env_routes = EnvRoutes(config_service, task_manager) + status_events = StatusEventManager(config_service) + container_routes = ContainerRoutes(config_service, task_manager, status_events) + deployment_routes = DeploymentRoutes(config_service, task_manager) - if include_routers: - config_service = ConfigService(compose_file, env_file) - env_routes = EnvRoutes(config_service) - container_routes = ContainerRoutes(config_service) - deployment_routes = DeploymentRoutes(config_service) - - app.include_router(env_routes.router) - app.include_router(container_routes.router) - app.include_router(deployment_routes.router) + app.include_router(api_routes.router) + app.include_router(env_routes.router) + app.include_router(container_routes.router) + app.include_router(deployment_routes.router) return app __all__ = [ "ConfigService", + "TaskManager", + "APIRoutes", "EnvRoutes", "ContainerRoutes", "DeploymentRoutes", diff --git a/src/base_deployment_controller/builder.py b/src/base_deployment_controller/builder.py index bada691..e58a5af 100644 --- a/src/base_deployment_controller/builder.py +++ b/src/base_deployment_controller/builder.py @@ -4,6 +4,9 @@ from fastapi import APIRouter, FastAPI from .services.config import ConfigService +from .services.task_manager import TaskManager +from .services.status_event_manager import StatusEventManager +from .routers.api import APIRoutes from .routers.environment import EnvRoutes from .routers.container import ContainerRoutes from .routers.deployment import DeploymentRoutes @@ -76,10 +79,15 @@ def build(self) -> FastAPI: ) config_service = ConfigService(self.compose_file, self.env_file) - env_routes = EnvRoutes(config_service) - container_routes = ContainerRoutes(config_service) - deployment_routes = DeploymentRoutes(config_service) - + task_manager = TaskManager(ttl=3600) + status_events = StatusEventManager(config_service) + + api_routes = APIRoutes() + env_routes = EnvRoutes(config_service, task_manager) + container_routes = ContainerRoutes(config_service, task_manager, status_events) + deployment_routes = DeploymentRoutes(config_service, task_manager) + + app.include_router(api_routes.router) app.include_router(env_routes.router) app.include_router(container_routes.router) app.include_router(deployment_routes.router) diff --git a/src/base_deployment_controller/models/__init__.py b/src/base_deployment_controller/models/__init__.py index 45f2bfe..c2cb9c6 100644 --- a/src/base_deployment_controller/models/__init__.py +++ b/src/base_deployment_controller/models/__init__.py @@ -1,33 +1,30 @@ """Pydantic models for the Base Deployment Controller.""" +from .api import APIInfoResponse from .environment import ( EnvVariable, EnvVariablesResponse, BulkEnvUpdateRequest, EnvUpdateResponse, ) -from .container import ContainerInfo, ContainersInfoResponse, ContainerControlResponse +from .container import ContainerInfo, ContainersInfoResponse from .deployment import ( DeploymentStatus, DeploymentMetadata, DeploymentInfoResponse, - DeploymentPingResponse, - DeploymentActionResponse, ) from .compose import ComposeActionResponse __all__ = [ + "APIInfoResponse", "EnvVariable", "EnvVariablesResponse", "BulkEnvUpdateRequest", "EnvUpdateResponse", "ContainerInfo", "ContainersInfoResponse", - "ContainerControlResponse", "DeploymentStatus", "DeploymentMetadata", "DeploymentInfoResponse", - "DeploymentPingResponse", - "DeploymentActionResponse", "ComposeActionResponse", ] diff --git a/src/base_deployment_controller/models/api.py b/src/base_deployment_controller/models/api.py new file mode 100644 index 0000000..998f00b --- /dev/null +++ b/src/base_deployment_controller/models/api.py @@ -0,0 +1,10 @@ +"""API root endpoint models.""" +from pydantic import BaseModel, Field + + +class APIInfoResponse(BaseModel): + """Response with general API information.""" + + name: str = Field(..., description="API name") + status: str = Field(..., description="API status") + message: str = Field(..., description="Status message") diff --git a/src/base_deployment_controller/models/container.py b/src/base_deployment_controller/models/container.py index 593c5d0..33da894 100644 --- a/src/base_deployment_controller/models/container.py +++ b/src/base_deployment_controller/models/container.py @@ -20,12 +20,3 @@ class ContainersInfoResponse(BaseModel): """Response with list of containers and their current status.""" containers: list[ContainerInfo] = Field(..., description="List of containers") - - -class ContainerControlResponse(BaseModel): - """Response after executing a control action on a container.""" - - success: bool = Field(..., description="Action success status") - container: str = Field(..., description="Container name") - action: str = Field(..., description="Action performed") - message: str = Field(..., description="Status message") diff --git a/src/base_deployment_controller/models/deployment.py b/src/base_deployment_controller/models/deployment.py index d40004e..acbe23a 100644 --- a/src/base_deployment_controller/models/deployment.py +++ b/src/base_deployment_controller/models/deployment.py @@ -47,11 +47,3 @@ class DeploymentPingResponse(BaseModel): success: bool = Field(..., description="Ping success status") message: str = Field(..., description="Ping message") - - -class DeploymentActionResponse(BaseModel): - """Response after performing a deployment action.""" - - success: bool = Field(..., description="Action success status") - action: str = Field(..., description="Action performed") - message: str = Field(..., description="Status message") \ No newline at end of file diff --git a/src/base_deployment_controller/models/events.py b/src/base_deployment_controller/models/events.py new file mode 100644 index 0000000..519ce2e --- /dev/null +++ b/src/base_deployment_controller/models/events.py @@ -0,0 +1,34 @@ +""" +Event models for container status streaming via SSE. +""" +from datetime import datetime +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class ServiceState(str, Enum): + """State of a service/container for status events.""" + + NOT_STARTED = "not_started" + PULLING = "pulling" + PULLED = "pulled" + CREATING = "creating" + STARTING = "starting" + STARTED = "started" + STOPPING = "stopping" + STOPPED = "stopped" + REMOVING = "removing" + REMOVED = "removed" + ERROR = "error" + + +class ContainerStatusEvent(BaseModel): + """Container status change event for SSE streaming.""" + + container_name: str = Field(..., description="Docker container name") + state: ServiceState = Field(..., description="New state") + prev_state: Optional[ServiceState] = Field(None, description="Previous state if known") + action: str = Field(..., description="Docker event action that triggered the state change") + timestamp: datetime = Field(..., description="Event timestamp") diff --git a/src/base_deployment_controller/models/task.py b/src/base_deployment_controller/models/task.py new file mode 100644 index 0000000..8ea4ed9 --- /dev/null +++ b/src/base_deployment_controller/models/task.py @@ -0,0 +1,36 @@ +""" +Task management models for async operations. +""" +from datetime import datetime +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + + +class TaskStatus(str, Enum): + """Status of a background task.""" + + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + + +class TaskResponse(BaseModel): + """Initial response when creating a background task.""" + + task_id: str = Field(..., description="Unique identifier for the task") + status: TaskStatus = Field(..., description="Current status of the task") + + +class TaskDetail(BaseModel): + """Detailed information about a task's execution.""" + + task_id: str = Field(..., description="Unique identifier for the task") + task_status: TaskStatus = Field(..., description="Overall task status") + operation: str = Field(..., description="Operation being performed (up, down, start, stop, etc.)") + error: Optional[str] = Field(None, description="Error message if task failed") + created_at: datetime = Field(..., description="When the task was created") + updated_at: datetime = Field(..., description="Last update timestamp") + completed_at: Optional[datetime] = Field(None, description="When the task completed (success or failure)") \ No newline at end of file diff --git a/src/base_deployment_controller/routers/__init__.py b/src/base_deployment_controller/routers/__init__.py index f9b11a5..9edda18 100644 --- a/src/base_deployment_controller/routers/__init__.py +++ b/src/base_deployment_controller/routers/__init__.py @@ -1,7 +1,8 @@ """FastAPI routers for the Base Deployment Controller.""" +from .api import APIRoutes from .environment import EnvRoutes from .container import ContainerRoutes from .deployment import DeploymentRoutes -__all__ = ["EnvRoutes", "ContainerRoutes", "DeploymentRoutes"] +__all__ = ["APIRoutes", "EnvRoutes", "ContainerRoutes", "DeploymentRoutes"] diff --git a/src/base_deployment_controller/routers/api.py b/src/base_deployment_controller/routers/api.py new file mode 100644 index 0000000..dfa9426 --- /dev/null +++ b/src/base_deployment_controller/routers/api.py @@ -0,0 +1,58 @@ +""" +API root endpoints for health check and general information. +""" +import logging + +from fastapi import APIRouter + +from ..models.api import APIInfoResponse + +logger = logging.getLogger(__name__) + + +class APIRoutes: + """ + API root router for general endpoints. + + Provides basic health checks and API information at the root path. + + Attributes: + router: Instance of `APIRouter` with root endpoints. + """ + + def __init__(self) -> None: + """Initialize API root routes.""" + self.router = self._build_router() + + def _build_router(self) -> APIRouter: + """ + Build and configure the router with root endpoints. + + Returns: + APIRouter configured with root handlers. + """ + router = APIRouter(tags=["API"]) + + # GET / - API health check and info + router.add_api_route( + "/", + self.get_api_info, + methods=["GET"], + response_model=APIInfoResponse, + ) + + return router + + async def get_api_info(self) -> APIInfoResponse: + """ + Get API health status and general information. + + Returns: + APIInfoResponse indicating API is operational. + """ + logger.debug("API info endpoint requested") + return APIInfoResponse( + name="Base Deployment Controller", + status="operational", + message="API is active and operational", + ) diff --git a/src/base_deployment_controller/routers/container.py b/src/base_deployment_controller/routers/container.py index 22bca90..8aea95e 100644 --- a/src/base_deployment_controller/routers/container.py +++ b/src/base_deployment_controller/routers/container.py @@ -2,18 +2,22 @@ Container management routes implemented with a class and dependency injection. """ import asyncio +import json import logging -from typing import Any, Dict +import threading +import time +from typing import AsyncIterator, Dict, Optional, Set -from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect +from fastapi import APIRouter, HTTPException, Request, WebSocket, WebSocketDisconnect +from fastapi.responses import Response, StreamingResponse from python_on_whales.exceptions import DockerException -from ..models.container import ( - ContainerInfo, - ContainersInfoResponse, - ContainerControlResponse, -) +from ..models.container import ContainerInfo, ContainersInfoResponse +from ..models.task import TaskResponse, TaskDetail, TaskStatus +from ..models.events import ContainerStatusEvent, ServiceState +from ..services.status_event_manager import StatusEventManager from ..services.config import ConfigService +from ..services.task_manager import TaskManager logger = logging.getLogger(__name__) @@ -22,25 +26,30 @@ class ContainerRoutes: """ Docker containers router built with dependency injection. - Provides endpoints for retrieving container status, controlling containers, + Provides endpoints for retrieving container status, controlling containers asynchronously, and streaming container logs via WebSocket. Args: config: Instance of `ConfigService` for Compose and Docker access. + task_manager: Instance of `TaskManager` for async operations. Attributes: config: Injected configuration service. + task_manager: Injected task manager service. router: Instance of `APIRouter` with `/containers` endpoints. """ - def __init__(self, config: ConfigService) -> None: + def __init__(self, config: ConfigService, task_manager: TaskManager, status_events: StatusEventManager) -> None: """ Initialize container routes. Args: config: Configuration service instance for dependency injection. + task_manager: Task manager instance for dependency injection. """ self.config = config + self.task_manager = task_manager + self.status_events = status_events self.router = self._build_router() def _build_router(self) -> APIRouter: @@ -51,6 +60,10 @@ def _build_router(self) -> APIRouter: APIRouter configured with GET and POST handlers for /containers. """ router = APIRouter(prefix="/containers", tags=["Containers"]) + + # IMPORTANT: Register specific routes BEFORE path parameter routes + # to avoid /events being matched by /{container_name} + # GET /containers - list all containers router.add_api_route( "", @@ -58,6 +71,12 @@ def _build_router(self) -> APIRouter: methods=["GET"], response_model=ContainersInfoResponse, ) + # GET /containers/events - SSE container status events (MUST be before /{container_name}) + router.add_api_route( + "/events", + self.stream_container_events, + methods=["GET"], + ) # GET /containers/ - get specific container info router.add_api_route( "/{container_name}", @@ -70,21 +89,34 @@ def _build_router(self) -> APIRouter: "/{container_name}/start", self.container_start, methods=["POST"], - response_model=ContainerControlResponse, + status_code=202, ) # POST /containers//stop - stop container router.add_api_route( "/{container_name}/stop", self.container_stop, methods=["POST"], - response_model=ContainerControlResponse, + status_code=202, ) # POST /containers//restart - restart container router.add_api_route( "/{container_name}/restart", self.container_restart, methods=["POST"], - response_model=ContainerControlResponse, + status_code=202, + ) + # GET /containers//tasks/{task_id} - get task status + router.add_api_route( + "/{container_name}/tasks/{task_id}", + self.get_task_status, + methods=["GET"], + response_model=TaskDetail, + ) + # GET /containers//tasks/{task_id}/stream - SSE stream + router.add_api_route( + "/{container_name}/tasks/{task_id}/stream", + self.stream_task_progress, + methods=["GET"], ) # WebSocket /containers//logs - stream container logs router.websocket("/{container_name}/logs")(self.container_logs) @@ -141,7 +173,9 @@ async def get_containers(self) -> ContainersInfoResponse: depends_on=self.config.get_service_dependencies(service_name), ) ) - logger.info(f"Successfully retrieved status for {len(containers)} containers") + logger.info( + f"Successfully retrieved status for {len(containers)} containers" + ) return ContainersInfoResponse(containers=containers) except Exception as e: logger.error(f"Failed to get container status: {e}") @@ -201,74 +235,84 @@ async def get_container(self, container_name: str) -> ContainerInfo: ) async def container_start( - self, container_name: str - ) -> ContainerControlResponse: + self, container_name: str, request: Request + ) -> Response: """ - Start a container. + Start a container asynchronously. + + Returns 202 Accepted with task_id for tracking progress. Args: container_name: Name of the service/container from compose.yaml. + request: FastAPI request object (for building Location header). Returns: - ContainerControlResponse with action result. + Response with 202 status and TaskResponse body. Raises: - HTTPException: If service/container not found or action fails. + HTTPException: If service/container not found. """ - return await self._control_container(container_name, "start") + return await self._control_container_async(container_name, "start", request) - async def container_stop( - self, container_name: str - ) -> ContainerControlResponse: + async def container_stop(self, container_name: str, request: Request) -> Response: """ - Stop a container. + Stop a container asynchronously. + + Returns 202 Accepted with task_id for tracking progress. Args: container_name: Name of the service/container from compose.yaml. + request: FastAPI request object (for building Location header). Returns: - ContainerControlResponse with action result. + Response with 202 status and TaskResponse body. Raises: - HTTPException: If service/container not found or action fails. + HTTPException: If service/container not found. """ - return await self._control_container(container_name, "stop") + return await self._control_container_async(container_name, "stop", request) async def container_restart( - self, container_name: str - ) -> ContainerControlResponse: + self, container_name: str, request: Request + ) -> Response: """ - Restart a container. + Restart a container asynchronously. + + Returns 202 Accepted with task_id for tracking progress. Args: container_name: Name of the service/container from compose.yaml. + request: FastAPI request object (for building Location header). Returns: - ContainerControlResponse with action result. + Response with 202 status and TaskResponse body. Raises: - HTTPException: If service/container not found or action fails. + HTTPException: If service/container not found. """ - return await self._control_container(container_name, "restart") + return await self._control_container_async(container_name, "restart", request) - async def _control_container( - self, container_name: str, action: str - ) -> ContainerControlResponse: + async def _control_container_async( + self, container_name: str, action: str, request: Request + ) -> Response: """ - Internal method to control a container (start/stop/restart). + Internal method to control a container asynchronously (start/stop/restart). Args: container_name: Name of the service/container from compose.yaml. action: Action to perform (start, stop, restart). + request: FastAPI request object (for building Location header). Returns: - ContainerControlResponse with action result. + Response with 202 status and TaskResponse body. Raises: - HTTPException: If service/container not found or action fails. + HTTPException: If service/container not found. """ try: - logger.debug(f"Control request for container: {container_name}, action: {action}") + logger.debug( + f"Async control request for container: {container_name}, action: {action}" + ) services = self.config.compose_services if container_name not in services: logger.warning(f"Service not found: {container_name}") @@ -276,9 +320,11 @@ async def _control_container( status_code=404, detail=f"Service '{container_name}' not found in compose.yaml", ) + service_config = services[container_name] actual_container_name = service_config.get("container_name", container_name) client = self.config.get_docker_client() + if not client.container.exists(actual_container_name): logger.warning(f"Container not found: {actual_container_name}") raise HTTPException( @@ -286,36 +332,122 @@ async def _control_container( detail=f"Container '{actual_container_name}' not found in Docker", ) - logger.info(f"Executing {action} on container: {actual_container_name}") - try: - if action == "start": - client.container.start(actual_container_name) - elif action == "stop": - client.container.stop(actual_container_name) - elif action == "restart": - client.container.restart(actual_container_name) - logger.info(f"Container {actual_container_name} {action}ed successfully") - except DockerException as e: - logger.error( - f"Docker error while executing {action} on {actual_container_name}: {e}" + # Create async task + task_id = await self.task_manager.create_task( + operation=f"container_{action}", + func=lambda: self._execute_container_action( + task_id, actual_container_name, action, container_name ) - raise HTTPException(status_code=500, detail=f"Docker error: {e}") - return ContainerControlResponse( - success=True, - container=container_name, - action=action, - message=f"Container {action}ed successfully", + ) + + # Build Location header + location = str( + request.url_for( + "get_task_status", container_name=container_name, task_id=task_id + ) + ) + + logger.info( + f"Container {action} task created for {container_name}: {task_id}" + ) + + # Return 202 Accepted + return Response( + status_code=202, + content=TaskResponse( + task_id=task_id, status=TaskStatus.RUNNING + ).model_dump_json(), + media_type="application/json", + headers={"Location": location}, ) except HTTPException: raise except Exception as e: - logger.error(f"Failed to {action} container {container_name}: {e}") + logger.error(f"Failed to create {action} task for {container_name}: {e}") raise HTTPException( status_code=500, detail=f"Failed to {action} container: {e}", ) - async def container_logs(self, websocket: WebSocket, container_name: str) -> None: + async def get_task_status(self, container_name: str, task_id: str) -> TaskDetail: + """ + Get the current status of a container task. + + Args: + container_name: Name of the service/container (for route consistency). + task_id: The unique identifier of the task. + + Returns: + TaskDetail with current task status, progress, and result. + + Raises: + HTTPException: If task not found. + """ + task = self.task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + + return task + + async def stream_task_progress( + self, container_name: str, task_id: str + ) -> StreamingResponse: + """ + Stream task progress updates via Server-Sent Events (SSE). + + Args: + container_name: Name of the service/container (for route consistency). + task_id: The unique identifier of the task. + + Returns: + StreamingResponse with SSE stream of task updates. + + Raises: + HTTPException: If task not found. + """ + task = self.task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + + async def event_generator() -> AsyncIterator[str]: + """Generate SSE events for task updates.""" + last_update = None + + while True: + current_task = self.task_manager.get_task(task_id) + if not current_task: + yield f"event: error\ndata: {json.dumps({'error': 'Task not found'})}\n\n" + break + + # Send update if task changed + if current_task.model_dump() != last_update: + last_update = current_task.model_dump() + yield f"data: {current_task.model_dump_json()}\n\n" + + # If task completed or failed, send final event and stop + if current_task.task_status in [ + TaskStatus.COMPLETED, + TaskStatus.FAILED, + ]: + yield f"event: done\ndata: {current_task.model_dump_json()}\n\n" + break + + # Wait before next check + await asyncio.sleep(0.2) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + async def container_logs( + self, websocket: WebSocket, container_name: str + ) -> None: """ Stream container logs in real-time via WebSocket. @@ -347,7 +479,9 @@ async def container_logs(self, websocket: WebSocket, container_name: str) -> Non f"WebSocket logs requested for non-existent container: {actual_container_name}" ) await websocket.send_json( - {"error": f"Container '{actual_container_name}' not found in Docker"} + { + "error": f"Container '{actual_container_name}' not found in Docker" + } ) await websocket.close() return @@ -378,7 +512,9 @@ async def container_logs(self, websocket: WebSocket, container_name: str) -> Non await asyncio.sleep(0) except DockerException as e: - logger.error(f"Failed to stream logs from {actual_container_name}: {e}") + logger.error( + f"Failed to stream logs from {actual_container_name}: {e}" + ) await websocket.send_json({"error": f"Failed to stream logs: {e}"}) except WebSocketDisconnect: logger.debug(f"WebSocket client disconnected for {container_name}") @@ -396,3 +532,86 @@ async def container_logs(self, websocket: WebSocket, container_name: str) -> Non except: pass logger.info(f"WebSocket connection closed for {container_name}") + + def _execute_container_action( + self, task_id: str, container_name: str, action: str, service_name: str + ) -> None: + """ + Execute container action (start/stop/restart). + + Runs in thread executor. TaskManager auto-manages state transitions: + PENDING -> RUNNING (on start) -> COMPLETED (success) or FAILED (exception). + Any exception is caught and its message stored in task.error. + + Args: + task_id: Task identifier (for logging). + container_name: Docker container name. + action: Action (start/stop/restart). + service_name: Service name (for logging). + + Raises: + Exception: On docker error. Message stored in task.error. + """ + try: + client = self.config.get_docker_client() + logger.info(f"[{task_id}] Executing container {action}") + if action == "start": + client.container.start(container_name) + elif action == "stop": + client.container.stop(container_name) + elif action == "restart": + client.container.restart(container_name) + logger.info(f"[{task_id}] Container {action} completed successfully") + + except DockerException as e: + logger.error(f"[{task_id}] Docker error while {action}ing {container_name}: {e}") + raise Exception(f"Docker error: {e}") + except Exception as e: + logger.error(f"[{task_id}] Error executing {action} on {container_name}: {e}") + raise + + async def stream_container_events(self) -> StreamingResponse: + """ + SSE endpoint streaming global container status events (on-demand). + + Clients receive ContainerStatusEvent objects containing: + - container_name: name of the container + - state: current ServiceState + - prev_state: previous state if known + - action: Docker event action that triggered the change + - timestamp: event timestamp + + The StatusEventManager starts listening to docker.system.events() when + the first client connects and stops when the last client disconnects. + """ + + async def event_generator() -> AsyncIterator[str]: + # Subscribe; start manager if first subscriber + subscriber_q = self.status_events.subscribe() + try: + while True: + # Poll queue for events with async support + try: + event: ContainerStatusEvent = await self.status_events.get_event(subscriber_q) + logger.debug(f"Emitting container event: {event.container_name} -> {event.state}") + payload = event.model_dump_json() + yield f"{payload}\n" + except asyncio.CancelledError: + break + # Small sleep to prevent busy waiting + await asyncio.sleep(0.01) + except Exception as e: + logger.error(f"Container events generator error: {e}") + finally: + self.status_events.unsubscribe(subscriber_q) + logger.debug("Container events subscriber disconnected") + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) diff --git a/src/base_deployment_controller/routers/deployment.py b/src/base_deployment_controller/routers/deployment.py index c8528c5..c8aa686 100644 --- a/src/base_deployment_controller/routers/deployment.py +++ b/src/base_deployment_controller/routers/deployment.py @@ -1,89 +1,123 @@ """ Deployment management routes implemented with a class and dependency injection. -Manages deployment-wide operations: status, up, stop, down, restart, ping. +Manages deployment-wide operations: status, up, stop, down, restart. """ +import asyncio +import json import logging -from typing import Any, Dict +from typing import AsyncIterator, Dict, Set -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import Response, StreamingResponse -from ..models.deployment import DeploymentMetadata, DeploymentStatus, DeploymentInfoResponse, DeploymentPingResponse, DeploymentActionResponse -from ..models.compose import ComposeActionResponse +from ..models.deployment import DeploymentInfoResponse from ..models.environment import EnvVariable +from ..models.task import TaskResponse, TaskDetail, TaskStatus from ..services.config import ConfigService +from ..services.task_manager import TaskManager logger = logging.getLogger(__name__) class DeploymentRoutes: """ - Root deployment router built with dependency injection. - - Manages deployment-wide operations at the root endpoint and control endpoints: - - GET / - Get deployment status with metadata and env-vars - - POST /up - Start deployment - - POST /stop - Stop deployment - - POST /down - Down deployment (stop and remove containers) - - POST /restart - Restart deployment - - GET /ping - Health check + Deployment router built with dependency injection. + + Manages deployment-wide operations: + - GET /deployment - Get deployment status with metadata and env-vars + - POST /deployment/up - Start deployment (async) + - POST /deployment/stop - Stop deployment (async) + - POST /deployment/down - Down deployment (async) + - POST /deployment/kill - Kill deployment (async) + - POST /deployment/restart - Restart deployment (async) + - GET /deployment/tasks/{task_id} - Get task status + - GET /deployment/tasks/{task_id}/stream - SSE stream of task progress Args: config: Instance of `ConfigService` for Compose and Docker access. + task_manager: Instance of `TaskManager` for async operations. Attributes: config: Injected configuration service. - router: Instance of `APIRouter` with root endpoints. + task_manager: Injected task manager service. + router: Instance of `APIRouter` with /deployment endpoints. """ - def __init__(self, config: ConfigService) -> None: + def __init__(self, config: ConfigService, task_manager: TaskManager) -> None: """ Initialize deployment routes. Args: config: Configuration service instance for dependency injection. + task_manager: Task manager instance for dependency injection. """ self.config = config + self.task_manager = task_manager self.router = self._build_router() def _build_router(self) -> APIRouter: """ - Build and configure the router with deployment endpoints at root level. + Build and configure the router with deployment endpoints. Returns: APIRouter configured with GET and POST handlers. """ - router = APIRouter(tags=["Deployment"]) - # Note: These routes will be registered without prefix in main.py - # to mount them at root level: /, /ping, /up, /stop, /down, /restart + router = APIRouter(prefix="/deployment", tags=["Deployment"]) + + # GET /deployment - deployment info router.add_api_route( - "/", + "", self.get_deployment_info, methods=["GET"], + response_model=DeploymentInfoResponse, ) - router.add_api_route( - "/ping", - self.ping, - methods=["GET"], - ) + # POST /deployment/up - start deployment router.add_api_route( "/up", self.deploy_up, methods=["POST"], + status_code=202, ) + # POST /deployment/stop - stop deployment router.add_api_route( "/stop", self.deploy_stop, methods=["POST"], + status_code=202, ) + # POST /deployment/down - down deployment router.add_api_route( "/down", self.deploy_down, methods=["POST"], + status_code=202, ) + # POST /deployment/kill - kill deployment + router.add_api_route( + "/kill", + self.deploy_kill, + methods=["POST"], + status_code=202, + ) + # POST /deployment/restart - restart deployment router.add_api_route( "/restart", self.deploy_restart, methods=["POST"], + status_code=202, + ) + # GET /deployment/tasks/{task_id} - get task status + router.add_api_route( + "/tasks/{task_id}", + self.get_task_status, + methods=["GET"], + response_model=TaskDetail, + ) + # GET /deployment/tasks/{task_id}/stream - SSE stream + router.add_api_route( + "/tasks/{task_id}/stream", + self.stream_task_progress, + methods=["GET"], ) return router @@ -101,10 +135,10 @@ async def get_deployment_info(self) -> DeploymentInfoResponse: logger.debug("Fetching deployment info") # Get metadata - metadata_dict: DeploymentMetadata = self.config.get_deployment_metadata() + metadata_dict = self.config.get_deployment_metadata() # Get status - status: DeploymentStatus = self.config.get_deployment_status() + status = self.config.get_deployment_status() # Get environment variables schema = self.config.get_env_vars_schema() @@ -122,160 +156,357 @@ async def get_deployment_info(self) -> DeploymentInfoResponse: advanced=var_schema.get("advanced", False), ) - logger.info("Successfully retrieved deployment info") + logger.info("Successfully retrieved deployment info") return DeploymentInfoResponse( - metadata=metadata_dict, - status=status, - env_vars=env_vars) + metadata=metadata_dict, status=status, env_vars=env_vars + ) except Exception as e: logger.error(f"Failed to get deployment info: {e}") raise HTTPException( status_code=500, detail=f"Failed to get deployment info: {e}" ) - async def ping(self) -> DeploymentPingResponse: + async def deploy_up(self, request: Request) -> Response: """ - Health check endpoint. + Start the deployment (docker compose up) asynchronously. + + Returns 202 Accepted with task_id for tracking progress. + + Args: + request: FastAPI request object (for building Location header). Returns: - DeploymentPingResponse indicating API is operational. + Response with 202 status and TaskResponse body. """ - logger.debug("Ping request received") - return DeploymentPingResponse(success=True, message="API is operational") + logger.info("Starting deployment (up) - asynchronous") + + # Create async task + task_id = await self.task_manager.create_task( + operation="up", + func=lambda: self._execute_compose_up(task_id), + ) + # Build Location header + location = str(request.url_for("get_task_status", task_id=task_id)) - async def deploy_up(self) -> DeploymentActionResponse: + logger.info(f"Deployment up task created: {task_id}") + + # Return 202 Accepted + return Response( + status_code=202, + content=TaskResponse( + task_id=task_id, status=TaskStatus.RUNNING + ).model_dump_json(), + media_type="application/json", + headers={"Location": location}, + ) + + async def deploy_stop(self, request: Request) -> Response: """ - Start the deployment (docker compose up). + Stop the deployment (docker compose stop) asynchronously. + + Returns 202 Accepted with task_id for tracking progress. + + Args: + request: FastAPI request object (for building Location header). Returns: - DeploymentActionResponse with success status and message. + Response with 202 status and TaskResponse body. + """ + logger.info("Stopping deployment - asynchronous") - Raises: - HTTPException: If deployment startup fails. + # Create async task + task_id = await self.task_manager.create_task( + operation="stop", + func=lambda: self._execute_compose_stop(task_id), + ) + + # Build Location header + location = str(request.url_for("get_task_status", task_id=task_id)) + + logger.info(f"Deployment stop task created: {task_id}") + + # Return 202 Accepted + return Response( + status_code=202, + content=TaskResponse( + task_id=task_id, status=TaskStatus.RUNNING + ).model_dump_json(), + media_type="application/json", + headers={"Location": location}, + ) + + async def deploy_down(self, request: Request) -> Response: """ - try: - logger.info("Starting deployment (up)") - result: ComposeActionResponse = self.config.docker_compose_up() + Down the deployment (docker compose down) asynchronously. - if not result.success: - logger.error(f"Failed to start deployment: {result.message}") - raise HTTPException( - status_code=500, detail=result.message - ) + Returns 202 Accepted with task_id for tracking progress. - logger.info("Deployment started successfully") - return DeploymentActionResponse( - success=result.success, - action="up", - message=result.message - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Error starting deployment: {e}") - raise HTTPException( - status_code=500, detail=f"Failed to start deployment: {str(e)}" - ) + Args: + request: FastAPI request object (for building Location header). - async def deploy_stop(self) -> DeploymentActionResponse: + Returns: + Response with 202 status and TaskResponse body. + """ + logger.info("Taking down deployment - asynchronous") + + # Create async task + task_id = await self.task_manager.create_task( + operation="down", + func=lambda: self._execute_compose_down(task_id), + ) + + # Build Location header + location = str(request.url_for("get_task_status", task_id=task_id)) + + logger.info(f"Deployment down task created: {task_id}") + + # Return 202 Accepted + return Response( + status_code=202, + content=TaskResponse( + task_id=task_id, status=TaskStatus.RUNNING + ).model_dump_json(), + media_type="application/json", + headers={"Location": location}, + ) + async def deploy_kill(self, request: Request) -> Response: """ - Stop the deployment (docker compose stop). + Kill the deployment (docker compose kill) asynchronously. + + Returns 202 Accepted with task_id for tracking progress. + + Args: + request: FastAPI request object (for building Location header). Returns: - DeploymentActionResponse with success status and message. + Response with 202 status and TaskResponse body. + """ + logger.info("Killing deployment - asynchronous") - Raises: - HTTPException: If deployment stop fails. + # Create async task + task_id = await self.task_manager.create_task( + operation="kill", + func=lambda: self._execute_compose_kill(task_id), + ) + + # Build Location header + location = str(request.url_for("get_task_status", task_id=task_id)) + + logger.info(f"Deployment kill task created: {task_id}") + + # Return 202 Accepted + return Response( + status_code=202, + content=TaskResponse( + task_id=task_id, status=TaskStatus.RUNNING + ).model_dump_json(), + media_type="application/json", + headers={"Location": location}, + ) + + async def deploy_restart(self, request: Request) -> Response: """ - try: - logger.info("Stopping deployment") - result: ComposeActionResponse = self.config.docker_compose_stop() + Restart the deployment (docker compose stop + up) asynchronously. - if not result.success: - logger.error(f"Failed to stop deployment: {result.message}") - raise HTTPException( - status_code=500, detail=result.message - ) + Returns 202 Accepted with task_id for tracking progress. - logger.info("Deployment stopped successfully") - return DeploymentActionResponse( - success=result.success, - action="stop", - message=result.message - ) - except HTTPException: - raise - except Exception as e: - logger.error(f"Error stopping deployment: {e}") - raise HTTPException( - status_code=500, detail=f"Failed to stop deployment: {str(e)}" - ) + Args: + request: FastAPI request object (for building Location header). - async def deploy_down(self) -> DeploymentActionResponse: + Returns: + Response with 202 status and TaskResponse body. """ - Down the deployment (docker compose down and remove volumes). + logger.info("Restarting deployment - asynchronous") + + # Create async task + task_id = await self.task_manager.create_task( + operation="restart", + func=lambda: self._execute_compose_restart(task_id), + ) + + # Build Location header + location = str(request.url_for("get_task_status", task_id=task_id)) + + logger.info(f"Deployment restart task created: {task_id}") + + # Return 202 Accepted + return Response( + status_code=202, + content=TaskResponse( + task_id=task_id, status=TaskStatus.RUNNING + ).model_dump_json(), + media_type="application/json", + headers={"Location": location}, + ) + + async def get_task_status(self, task_id: str) -> TaskDetail: + """ + Get the current status of a deployment task. + + Args: + task_id: The unique identifier of the task. Returns: - DeploymentActionResponse with success status and message. + TaskDetail with current task status, progress, and result. Raises: - HTTPException: If deployment down fails. + HTTPException: If task not found. """ - try: - logger.info("Downing deployment (removing containers and volumes)") - result: ComposeActionResponse = self.config.docker_compose_down() + task = self.task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + + return task + async def stream_task_progress(self, task_id: str) -> StreamingResponse: + """ + Stream task progress updates via Server-Sent Events (SSE). + + Args: + task_id: The unique identifier of the task. + + Returns: + StreamingResponse with SSE stream of task updates. + + Raises: + HTTPException: If task not found. + """ + task = self.task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + + async def event_generator() -> AsyncIterator[str]: + """Generate SSE events for task updates.""" + last_update = None + + while True: + current_task = self.task_manager.get_task(task_id) + if not current_task: + yield f"event: error\ndata: {json.dumps({'error': 'Task not found'})}\n\n" + break + + # Send update if task changed + if current_task.model_dump() != last_update: + last_update = current_task.model_dump() + yield f"data: {current_task.model_dump_json()}\n\n" + + # If task completed or failed, send final event and stop + if current_task.task_status in [ + TaskStatus.COMPLETED, + TaskStatus.FAILED, + ]: + yield f"event: done\ndata: {current_task.model_dump_json()}\n\n" + break + + # Wait before next check + await asyncio.sleep(0.5) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + def _execute_compose_up(self, task_id: str) -> None: + """ + Execute docker compose up. + + Runs in thread executor. TaskManager auto-manages state transitions: + PENDING -> RUNNING (on start) -> COMPLETED (success) or FAILED (exception). + Any exception is caught and its message stored in task.error. + """ + try: + logger.info(f"[{task_id}] Executing compose up") + result = self.config.docker_compose_up() if not result.success: - logger.error(f"Failed to down deployment: {result.message}") - raise HTTPException( - status_code=500, detail=result.message - ) + raise Exception(result.message) + logger.info(f"[{task_id}] Compose up completed successfully") - logger.info("Deployment downed successfully") - return DeploymentActionResponse( - success=result.success, - action="down", - message=result.message - ) - except HTTPException: - raise except Exception as e: - logger.error(f"Error downing deployment: {e}") - raise HTTPException( - status_code=500, detail=f"Failed to down deployment: {str(e)}" - ) + logger.error(f"[{task_id}] Error executing compose up: {e}") + raise + - async def deploy_restart(self) -> DeploymentActionResponse: + def _execute_compose_stop(self, task_id: str) -> None: """ - Restart the deployment (docker compose down then up). + Execute docker compose stop. - Returns: - DeploymentActionResponse with success status and message. + Runs in thread executor. TaskManager auto-manages state transitions: + PENDING -> RUNNING (on start) -> COMPLETED (success) or FAILED (exception). + Any exception is caught and its message stored in task.error. + """ + try: + logger.info(f"[{task_id}] Executing compose stop") + result = self.config.docker_compose_stop() + if not result.success: + raise Exception(result.message) + logger.info(f"[{task_id}] Compose stop completed successfully") - Raises: - HTTPException: If deployment restart fails. + except Exception as e: + logger.error(f"[{task_id}] Error executing compose stop: {e}") + raise + + def _execute_compose_down(self, task_id: str) -> None: + """ + Execute docker compose down. + + Runs in thread executor. TaskManager auto-manages state transitions: + PENDING -> RUNNING (on start) -> COMPLETED (success) or FAILED (exception). + Any exception is caught and its message stored in task.error. """ try: - logger.info("Restarting deployment") - result: ComposeActionResponse = self.config.docker_compose_restart() + logger.info(f"[{task_id}] Executing compose down") + result = self.config.docker_compose_down() + if not result.success: + raise Exception(result.message) + logger.info(f"[{task_id}] Compose down completed successfully") + + except Exception as e: + logger.error(f"[{task_id}] Error executing compose down: {e}") + raise + def _execute_compose_kill(self, task_id: str) -> None: + """ + Execute docker compose kill. + + Runs in thread executor. TaskManager auto-manages state transitions: + PENDING -> RUNNING (on start) -> COMPLETED (success) or FAILED (exception). + Any exception is caught and its message stored in task.error. + """ + try: + logger.info(f"[{task_id}] Executing compose kill") + result = self.config.docker_compose_kill() if not result.success: - logger.error(f"Failed to stop deployment (while restarting): {result.message}") - raise HTTPException( - status_code=500, detail=result.message - ) + raise Exception(result.message) + logger.info(f"[{task_id}] Compose kill completed successfully") - logger.info("Deployment restarted successfully") - return DeploymentActionResponse( - success=result.success, - action="restart", - message=result.message - ) - except HTTPException: + except Exception as e: + logger.error(f"[{task_id}] Error executing compose kill: {e}") raise + + def _execute_compose_restart(self, task_id: str) -> None: + """ + Execute docker compose restart (stop + up). + + Runs in thread executor. TaskManager auto-manages state transitions: + PENDING -> RUNNING (on start) -> COMPLETED (success) or FAILED (exception). + Any exception is caught and its message stored in task.error. + """ + try: + logger.info(f"[{task_id}] Executing compose restart") + stop_result = self.config.docker_compose_stop() + if not stop_result.success: + raise Exception(stop_result.message) + up_result = self.config.docker_compose_up() + if not up_result.success: + raise Exception(up_result.message) + logger.info(f"[{task_id}] Compose restart completed successfully") + except Exception as e: - logger.error(f"Error restarting deployment: {e}") - raise HTTPException( - status_code=500, detail=f"Failed to restart deployment: {str(e)}" - ) + logger.error(f"[{task_id}] Error executing compose restart: {e}") + raise diff --git a/src/base_deployment_controller/routers/environment.py b/src/base_deployment_controller/routers/environment.py index d204ae2..2caccdc 100644 --- a/src/base_deployment_controller/routers/environment.py +++ b/src/base_deployment_controller/routers/environment.py @@ -1,9 +1,15 @@ """ Environment variables routes implemented with a class and dependency injection. """ +import asyncio +import json import logging +import threading +import time +from typing import AsyncIterator, Dict, Optional, Set -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Request +from fastapi.responses import Response, StreamingResponse from ..models.environment import ( EnvVariable, @@ -11,7 +17,9 @@ BulkEnvUpdateRequest, EnvUpdateResponse, ) +from ..models.task import TaskResponse, TaskDetail, TaskStatus from ..services.config import ConfigService +from ..services.task_manager import TaskManager logger = logging.getLogger(__name__) @@ -21,24 +29,29 @@ class EnvRoutes: Environment variables router built with dependency injection. Provides endpoints for retrieving and updating environment variables - defined in the compose.yaml x-env-vars schema. + defined in the compose.yaml x-env-vars schema. Updates are async when + services need to be restarted. Args: config: Instance of `ConfigService` for file access and validation. + task_manager: Instance of `TaskManager` for async operations. Attributes: config: Injected configuration service. + task_manager: Injected task manager service. router: Instance of `APIRouter` with `/envs` endpoints. """ - def __init__(self, config: ConfigService) -> None: + def __init__(self, config: ConfigService, task_manager: TaskManager) -> None: """ Initialize environment routes. Args: config: Configuration service instance for dependency injection. + task_manager: Task manager instance for dependency injection. """ self.config = config + self.task_manager = task_manager self.router = self._build_router() def _build_router(self) -> APIRouter: @@ -59,7 +72,20 @@ def _build_router(self) -> APIRouter: "", self.update_environment_variables, methods=["PUT"], - response_model=EnvUpdateResponse, + status_code=202, + ) + # GET /envs/tasks/{task_id} - get task status + router.add_api_route( + "/tasks/{task_id}", + self.get_task_status, + methods=["GET"], + response_model=TaskDetail, + ) + # GET /envs/tasks/{task_id}/stream - SSE stream + router.add_api_route( + "/tasks/{task_id}/stream", + self.stream_task_progress, + methods=["GET"], ) return router @@ -103,16 +129,20 @@ async def get_environment_variables(self) -> EnvVariablesResponse: ) async def update_environment_variables( - self, request: BulkEnvUpdateRequest - ) -> EnvUpdateResponse: + self, request: BulkEnvUpdateRequest, fastapi_request: Request + ) -> Response: """ - Update environment variables in .env file. + Update environment variables in .env file asynchronously. + + When restart_services is True, the operation runs asynchronously + and returns 202 Accepted with a task_id. Args: - request: Bulk update request. + request: Bulk update request with variables and restart flag. + fastapi_request: FastAPI request object (for building Location header). Returns: - EnvUpdateResponse with list of updated variables and restart results. + Response with 202 status and TaskResponse body. Raises: HTTPException: If validation fails or variables cannot be updated. @@ -123,6 +153,8 @@ async def update_environment_variables( logger.debug( "Bulk environment update request with %d variables", len(updates) ) + + # Validate all variables first for var_name, var_value in updates.items(): if var_name not in schema: logger.warning(f"Attempted to add unknown variable: {var_name}") @@ -135,40 +167,176 @@ async def update_environment_variables( self.config.validate_variable_value( var_name, var_value, var_schema["type"] ) - logger.debug(f"Validated variable {var_name} with value: {var_value}") + logger.debug( + f"Validated variable {var_name} with value: {var_value}" + ) except ValueError as e: logger.warning(f"Validation failed for {var_name}: {e}") raise HTTPException(status_code=400, detail=str(e)) + logger.info(f"Updating {len(updates)} environment variables") + + # Create async task for update + restart + task_id = await self.task_manager.create_task( + operation="env_update", + func=lambda: self._execute_env_update( + task_id, updates, request.restart_services + ), + ) + + # Build Location header + location = str(fastapi_request.url_for("get_task_status", task_id=task_id)) + + logger.info(f"Environment update task created: {task_id}") + + # Return 202 Accepted + return Response( + status_code=202, + content=TaskResponse( + task_id=task_id, status=TaskStatus.RUNNING + ).model_dump_json(), + media_type="application/json", + headers={"Location": location}, + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to create env update task: {e}") + raise HTTPException( + status_code=500, detail=f"Failed to update environment variables: {e}" + ) + + async def get_task_status(self, task_id: str) -> TaskDetail: + """ + Get the current status of an environment update task. + + Args: + task_id: The unique identifier of the task. + + Returns: + TaskDetail with current task status, progress, and result. + + Raises: + HTTPException: If task not found. + """ + task = self.task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + + return task + + async def stream_task_progress(self, task_id: str) -> StreamingResponse: + """ + Stream task progress updates via Server-Sent Events (SSE). + + Args: + task_id: The unique identifier of the task. + + Returns: + StreamingResponse with SSE stream of task updates. + + Raises: + HTTPException: If task not found. + """ + task = self.task_manager.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + + async def event_generator() -> AsyncIterator[str]: + """Generate SSE events for task updates.""" + last_update = None + + while True: + current_task = self.task_manager.get_task(task_id) + if not current_task: + yield f"event: error\ndata: {json.dumps({'error': 'Task not found'})}\n\n" + break + + # Send update if task changed + if current_task.model_dump() != last_update: + last_update = current_task.model_dump() + yield f"data: {current_task.model_dump_json()}\n\n" + + # If task completed or failed, send final event and stop + if current_task.task_status in [ + TaskStatus.COMPLETED, + TaskStatus.FAILED, + ]: + yield f"event: done\ndata: {current_task.model_dump_json()}\n\n" + break + + # Wait before next check + await asyncio.sleep(0.5) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + # Helper methods for executing environment operations + + # Service event monitoring removed; use /containers/events SSE for container status changes. + + def _execute_env_update( + self, + task_id: str, + updates: dict[str, str], + restart_services: bool, + ) -> EnvUpdateResponse: + """ + Execute environment variable update with optional service restart. + + Runs in thread executor. TaskManager auto-manages state transitions: + PENDING -> RUNNING (on start) -> COMPLETED (success) or FAILED (exception). + Any exception is caught and its message stored in task.error. + + Args: + task_id: Task identifier (for logging). + updates: Dict of variables to update. + restart_services: Whether to restart affected services. + + Returns: + EnvUpdateResponse with update results. + + Raises: + Exception: On update/restart failure. Message stored in task.error. + """ + try: + # Update .env file + logger.info(f"[{task_id}] Updating {len(updates)} environment variables") self.config.update_env_file(updates) - # Restart affected services - affected_services = self.config.get_affected_services(list(updates.keys())) - logger.debug(f"Affected services: {affected_services}") + # Build updated variables list for response + updated_var_names = list(updates.keys()) + + # Restart services if requested restart_results: dict[str, bool] = {} - if request.restart_services: + if restart_services: + # Compute affected services and restart via ConfigService + affected_services = self.config.get_affected_services(updated_var_names) + logger.info(f"[{task_id}] Restarting {len(affected_services)} affected services") restart_results = self.config.restart_services(affected_services) - logger.info( - "Successfully updated %d variables. Restart results: %s", - len(updates), - restart_results, - ) - else: - logger.info( - "Successfully updated %d variables. Restart skipped by request", - len(updates), - ) + logger.info(f"[{task_id}] Environment update completed successfully") return EnvUpdateResponse( success=True, - updated=list(updates.keys()), - message="Variables updated successfully", + message=f"Updated {len(updates)} environment variables", + updated=updated_var_names, restarted_services=restart_results, ) - except HTTPException: - raise except Exception as e: - logger.error(f"Failed to update variables: {e}") - raise HTTPException( - status_code=500, detail=f"Failed to update variables: {e}" + logger.error(f"[{task_id}] Error executing environment update: {e}") + return EnvUpdateResponse( + success=False, + message=f"Failed to update environment variables: {str(e)}", + updated=[], + restarted_services={}, ) + + # Service-state updates removed; tasks no longer track per-service states. diff --git a/src/base_deployment_controller/services/config.py b/src/base_deployment_controller/services/config.py index 31f7910..38bc399 100644 --- a/src/base_deployment_controller/services/config.py +++ b/src/base_deployment_controller/services/config.py @@ -491,6 +491,27 @@ def docker_compose_down(self) -> ComposeActionResponse: success=False, message=str(e) ) + + def docker_compose_kill(self) -> ComposeActionResponse: + """ + Execute docker compose kill and return the result. + + Returns: + ComposeActionResponse with success status and message. + """ + try: + client = self.get_docker_client() + client.compose.kill() + return ComposeActionResponse( + success=True, + message="Deployment killed successfully", + ) + except Exception as e: + logger.error(f"Failed to kill deployment: {e}") + return ComposeActionResponse( + success=False, + message=str(e), + ) def docker_compose_restart(self) -> ComposeActionResponse: """ diff --git a/src/base_deployment_controller/services/status_event_manager.py b/src/base_deployment_controller/services/status_event_manager.py new file mode 100644 index 0000000..f60db43 --- /dev/null +++ b/src/base_deployment_controller/services/status_event_manager.py @@ -0,0 +1,156 @@ +""" +StatusEventManager: on-demand Docker events monitor with SSE subscribers. +""" +import threading +import logging +import time +from datetime import datetime, timezone +from typing import List, Dict, Optional +from queue import Queue, Empty + +import asyncio + +from ..models.events import ContainerStatusEvent, ServiceState +from ..services.config import ConfigService + +logger = logging.getLogger(__name__) + + +class StatusEventManager: + """ + Manages a single Docker events monitor and broadcasts container status events + to subscribed SSE clients. Starts when the first subscriber connects and stops + when there are no subscribers. + """ + + def __init__(self, config: ConfigService) -> None: + self.config = config + self._thread: Optional[threading.Thread] = None + self._stop_event = threading.Event() + self._subscribers: List[Queue] = [] + self._lock = threading.Lock() + self._last_state: Dict[str, ServiceState] = {} + + def _ensure_started(self) -> None: + with self._lock: + if self._thread and self._thread.is_alive(): + return + # Reset stop event + self._stop_event.clear() + # Start monitor thread + self._thread = threading.Thread(target=self._monitor_loop, daemon=True) + self._thread.start() + logger.info("StatusEventManager: monitor started") + + def _maybe_stop(self) -> None: + with self._lock: + if self._subscribers: + return + if self._thread and self._thread.is_alive(): + self._stop_event.set() + self._thread.join(timeout=5) + self._thread = None + logger.info("StatusEventManager: monitor stopped") + + def subscribe(self) -> Queue: + """Add a new subscriber and ensure the monitor is running.""" + q: Queue = Queue() + with self._lock: + self._subscribers.append(q) + self._ensure_started() + return q + + def unsubscribe(self, q: Queue) -> None: + """Remove subscriber and stop monitor if none left.""" + with self._lock: + if q in self._subscribers: + self._subscribers.remove(q) + self._maybe_stop() + + async def get_event(self, q: Queue) -> ContainerStatusEvent: + """ + Async helper to get next event from a subscriber queue. + + Polls the queue with short timeout to allow cancellation checks. + """ + loop = asyncio.get_event_loop() + while True: + try: + # Use run_in_executor to avoid blocking + event = await loop.run_in_executor(None, q.get, True, 0.1) + return event + except Empty: + # Queue empty, yield control and retry + await asyncio.sleep(0.05) + except Exception as e: + logger.debug(f"Error getting event from queue: {e}") + await asyncio.sleep(0.05) + + def _broadcast(self, event: ContainerStatusEvent) -> None: + # Snapshot subscribers to avoid holding lock while putting + with self._lock: + subscribers = list(self._subscribers) + for q in subscribers: + try: + q.put(event, timeout=0.1) + except Exception: + pass + + def _monitor_loop(self) -> None: + """Background thread: listen to Docker events and broadcast mapped state events.""" + try: + logger.info("StatusEventManager: starting Docker event monitor") + docker = self.config.get_docker_client() + action_to_state = { + "kill": ServiceState.STOPPING, + "stop": ServiceState.STOPPED, + "die": ServiceState.STOPPED, + "create": ServiceState.CREATING, + "start": ServiceState.STARTING, + "health_status: healthy": ServiceState.STARTED, + "destroy": ServiceState.REMOVED, + "pull": ServiceState.PULLING, + "build": ServiceState.CREATING, + } + + logger.info("StatusEventManager: listening to docker.system.events()") + for event in docker.system.events(filters={"type": "container"}): + if self._stop_event.is_set(): + logger.info("StatusEventManager: stop event received, breaking") + break + try: + action = getattr(event, "action", "").lower() + actor = getattr(event, "actor", None) + attributes = getattr(actor, "attributes", {}) if actor else {} + name = attributes.get("name") + + if not name: + logger.debug(f"StatusEventManager: skipping event with no name, action={action}") + continue + + new_state = action_to_state.get(action) + if not new_state: + logger.debug(f"StatusEventManager: unmapped action '{action}' for {name}") + continue + + prev_state = self._last_state.get(name) + self._last_state[name] = new_state + + logger.info(f"StatusEventManager: {name} state={new_state} (action={action})") + + ev = ContainerStatusEvent( + container_name=name, + state=new_state, + prev_state=prev_state, + action=action, + timestamp=datetime.now(timezone.utc), + ) + self._broadcast(ev) + except Exception as e: + logger.error(f"StatusEventManager: error processing event: {e}", exc_info=True) + # Yield to avoid tight loop + time.sleep(0.01) + except Exception as e: + logger.error(f"StatusEventManager: monitor loop error: {e}", exc_info=True) + finally: + logger.info("StatusEventManager: monitor loop exiting") diff --git a/src/base_deployment_controller/services/task_manager.py b/src/base_deployment_controller/services/task_manager.py new file mode 100644 index 0000000..093da7e --- /dev/null +++ b/src/base_deployment_controller/services/task_manager.py @@ -0,0 +1,195 @@ +""" +Task manager for executing Docker operations asynchronously. +""" +import asyncio +import logging +import uuid +from datetime import datetime, timedelta, timezone +from typing import Any, Callable, Dict, Optional + +from ..models.task import TaskDetail, TaskStatus + +logger = logging.getLogger(__name__) + + +class TaskManager: + """ + Manages background tasks for Docker operations. + + Executes blocking Docker commands in a thread pool executor to avoid + blocking FastAPI's event loop. Tracks task state, service states, and + provides cleanup for completed tasks. + + Attributes: + tasks: Dictionary mapping task_id to TaskDetail. + ttl: Time-to-live for completed tasks before cleanup (seconds). + """ + + def __init__(self, ttl: int = 3600): + """ + Initialize task manager. + + Args: + ttl: Time-to-live in seconds for completed tasks (default: 1 hour). + """ + self.tasks: Dict[str, TaskDetail] = {} + self.ttl = ttl + self._lock = asyncio.Lock() + logger.info(f"TaskManager initialized with TTL={ttl}s") + + async def create_task( + self, + operation: str, + func: Callable[[], Any], + ) -> str: + """ + Create and start a background task. + + Args: + operation: Operation name (e.g., "up", "down", "start", "stop"). + func: Synchronous function to execute (Docker command). + + Returns: + task_id: Unique identifier for the created task. + """ + task_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc) + + async with self._lock: + self.tasks[task_id] = TaskDetail( + task_id=task_id, + task_status=TaskStatus.PENDING, + operation=operation, + error=None, + created_at=now, + updated_at=now, + completed_at=None, + ) + + logger.info(f"Task {task_id} created for operation '{operation}'") + + # Start background execution + asyncio.create_task(self._execute_task(task_id, func)) + + return task_id + + async def _execute_task(self, task_id: str, func: Callable[[], Any]) -> None: + """ + Execute a task in a thread pool executor. + + Args: + task_id: Task identifier. + func: Synchronous function to execute. + """ + try: + # Update status to running + await self._update_task_status(task_id, TaskStatus.RUNNING) + logger.debug(f"Task {task_id} starting execution") + + # Execute blocking function in executor + loop = asyncio.get_event_loop() + result = await loop.run_in_executor(None, func) + + # Mark as completed + await self._update_task_status( + task_id, TaskStatus.COMPLETED, completed=True + ) + logger.info(f"Task {task_id} completed successfully") + + except Exception as e: + logger.error(f"Task {task_id} failed: {e}", exc_info=True) + await self._update_task_status( + task_id, TaskStatus.FAILED, error=str(e), completed=True + ) + + async def _update_task_status( + self, + task_id: str, + status: TaskStatus, + error: Optional[str] = None, + completed: bool = False, + ) -> None: + """ + Update task status. + + Args: + task_id: Task identifier. + status: New task status. + error: Optional error message. + completed: Whether the task is completed. + """ + async with self._lock: + if task_id in self.tasks: + task = self.tasks[task_id] + task.task_status = status + task.updated_at = datetime.now(timezone.utc) + if error: + task.error = error + if completed: + task.completed_at = datetime.now(timezone.utc) + logger.debug(f"Task {task_id} status updated to {status}") + + def get_task(self, task_id: str) -> Optional[TaskDetail]: + """ + Retrieve task details (synchronous for simplicity). + + Args: + task_id: Task identifier. + + Returns: + TaskDetail if found, None otherwise. + """ + return self.tasks.get(task_id) + + async def cleanup_old_tasks(self) -> None: + """ + Remove completed tasks older than TTL. + """ + cutoff = datetime.now(timezone.utc) - timedelta(seconds=self.ttl) + async with self._lock: + to_remove = [ + task_id + for task_id, task in self.tasks.items() + if task.completed_at and task.completed_at < cutoff + ] + for task_id in to_remove: + del self.tasks[task_id] + logger.debug(f"Cleaned up task {task_id}") + + if to_remove: + logger.info(f"Cleaned up {len(to_remove)} old tasks") + + async def get_task_stream(self, task_id: str): + """ + Generator for SSE streaming of task updates. + + Args: + task_id: Task identifier. + + Yields: + TaskDetail snapshots as the task progresses. + """ + if task_id not in self.tasks: + logger.warning(f"Stream requested for non-existent task {task_id}") + return + + last_update = None + + while True: + task = self.get_task(task_id) + if not task: + logger.debug(f"Task {task_id} no longer exists, ending stream") + break + + # Yield if there's an update + if last_update is None or task.updated_at > last_update: + last_update = task.updated_at + yield task + + # Exit if task is completed + if task.task_status in (TaskStatus.COMPLETED, TaskStatus.FAILED): + logger.debug(f"Task {task_id} finished, ending stream") + break + + # Wait before next check + await asyncio.sleep(0.5) diff --git a/test_events_endpoint.py b/test_events_endpoint.py new file mode 100644 index 0000000..a0e5a33 --- /dev/null +++ b/test_events_endpoint.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python +""" +Test script for /containers/events SSE endpoint. + +This script: +1. Starts the FastAPI app +2. Connects to /containers/events SSE endpoint +3. Triggers a container action (start/stop) +4. Verifies that ContainerStatusEvent is received with correct data +""" +import asyncio +import json +import subprocess +import time +import sys +import signal +from pathlib import Path + +import httpx +import requests + + +async def test_events_endpoint(): + """Test /containers/events endpoint with a real container action.""" + + api_url = "http://127.0.0.1:8000" + + print("=" * 70) + print("Testing /containers/events SSE endpoint") + print("=" * 70) + + # Wait for server to be ready + print("\n1. Waiting for API server to be ready...") + for i in range(30): + try: + resp = requests.get(f"{api_url}/", timeout=2) + if resp.status_code == 200: + print("✓ API server is ready") + break + except: + pass + print(f" Attempt {i+1}/30...") + time.sleep(1) + else: + print("✗ API server did not start") + return False + + # Get list of containers + print("\n2. Getting list of containers...") + resp = requests.get(f"{api_url}/containers") + if resp.status_code != 200: + print(f"✗ Failed to get containers: {resp.status_code}") + return False + + containers = resp.json().get("containers", []) + if not containers: + print("⚠ No containers available for testing") + return False + + test_container = containers[0]["name"] + print(f"✓ Found {len(containers)} containers, testing with: {test_container}") + + # Start SSE stream in background + print("\n3. Connecting to /containers/events SSE stream...") + events_received = [] + stream_error = None + + def stream_events(): + nonlocal stream_error + try: + with requests.get(f"{api_url}/containers/events", stream=True, timeout=30) as resp: + if resp.status_code != 200: + stream_error = f"SSE endpoint returned {resp.status_code}" + return + + print(f"✓ SSE stream opened (status {resp.status_code})") + + for line in resp.iter_lines(): + if not line or line.startswith(b':'): + continue + + if line.startswith(b'data:'): + json_str = line[5:].strip().decode('utf-8') + try: + event_data = json.loads(json_str) + events_received.append(event_data) + print(f" ✓ Event received: {event_data.get('container_name')} -> {event_data.get('state')}") + except json.JSONDecodeError as e: + print(f" ✗ Failed to parse event JSON: {e}") + except Exception as e: + stream_error = str(e) + + # Run stream in thread + import threading + stream_thread = threading.Thread(target=stream_events, daemon=True) + stream_thread.start() + + # Give stream time to connect + time.sleep(2) + + if stream_error: + print(f"✗ SSE stream error: {stream_error}") + return False + + # Trigger a container action + print(f"\n4. Triggering container action on {test_container}...") + try: + resp = requests.post( + f"{api_url}/containers/{test_container}/restart", + timeout=5 + ) + print(f" POST /containers/{test_container}/restart -> {resp.status_code}") + if resp.status_code == 202: + print(f" ✓ Task created: {resp.json().get('task_id')}") + except Exception as e: + print(f" ✗ Failed to trigger action: {e}") + + # Wait for events + print("\n5. Waiting for container events (10 seconds)...") + for i in range(10): + if events_received: + print(f"✓ Received {len(events_received)} event(s)") + break + print(f" Waiting... ({i+1}/10)") + time.sleep(1) + + # Verify events + print("\n6. Verifying event structure...") + if not events_received: + print("⚠ No events received (Docker daemon may not emit events during test)") + print(" (This may be expected if containers aren't actually restarting)") + return True + + for i, event in enumerate(events_received): + print(f"\n Event #{i+1}:") + required_fields = ["container_name", "state", "action", "timestamp"] + for field in required_fields: + if field in event: + print(f" ✓ {field}: {event[field]}") + else: + print(f" ✗ MISSING {field}") + return False + + if "prev_state" in event: + print(f" ✓ prev_state: {event['prev_state']}") + + print("\n" + "=" * 70) + print("✓ All tests passed!") + print("=" * 70) + return True + + +if __name__ == "__main__": + try: + result = asyncio.run(test_events_endpoint()) + sys.exit(0 if result else 1) + except KeyboardInterrupt: + print("\n\nTest interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\n\nTest failed with error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/tests/conftest.py b/tests/conftest.py index 0636774..e38c7f8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,16 +4,28 @@ import os import subprocess import time +import json from pathlib import Path +from typing import Dict, Any, AsyncIterator import pytest import requests @pytest.fixture(scope="session") -def api_url(): - """Base URL for the API.""" - return "http://localhost:8000" +def server_port() -> int: + """Find and reserve a free TCP port for the test server.""" + import socket + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + return s.getsockname()[1] + + +@pytest.fixture(scope="session") +def api_url(server_port: int): + """Base URL for the API using a dynamically allocated port.""" + return f"http://127.0.0.1:{server_port}" @pytest.fixture(scope="session") @@ -22,8 +34,35 @@ def compose_data_dir(): return Path("data") +def wait_for_api_ready(api_url: str, timeout: int = 60) -> bool: + """ + Wait for API to be ready, checking the root endpoint. + + Args: + api_url: Base API URL + timeout: Maximum wait time in seconds + + Returns: + True if API is ready, raises otherwise + """ + session = requests.Session() + start_time = time.time() + + while time.time() - start_time < timeout: + try: + response = session.get(f"{api_url}/", timeout=2) + if response.status_code == 200: + data = response.json() + if data.get("status") == "operational": + return True + except requests.RequestException: + time.sleep(0.5) + + pytest.fail(f"Server did not start within {timeout} seconds") + + @pytest.fixture(scope="session") -def api_server(api_url): +def api_server(api_url, server_port: int): """ Start the FastAPI server for the test session. @@ -41,7 +80,7 @@ def api_server(api_url): process = subprocess.Popen( [sys.executable, "-m", "uvicorn", "main:app", - "--host", "0.0.0.0", "--port", "8000"], + "--host", "127.0.0.1", "--port", str(server_port)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, @@ -49,21 +88,7 @@ def api_server(api_url): ) # Wait for server to be ready - session = requests.Session() - start_time = time.time() - timeout = 60 - - while time.time() - start_time < timeout: - try: - response = session.get(f"{api_url}/ping", timeout=2) - if response.status_code == 200: - break - except requests.RequestException: - time.sleep(0.5) - else: - process.terminate() - process.wait() - pytest.fail(f"Server did not start within {timeout} seconds") + wait_for_api_ready(api_url) yield process @@ -128,3 +153,8 @@ def check_dependencies(compose_data_dir): pytest.fail(f"Compose file not found: {compose_file}") return True + + +# Migrate test helper to tests/utils.py for importability in tests +from tests.utils import stream_task_updates # noqa: F401 + diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..2d16cee --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,27 @@ +""" +Test cases for the API root endpoint (GET /). +""" +import pytest + + +class TestAPIEndpoint: + """API root endpoint tests.""" + + def test_api_root_operational(self, api_client, api_url, api_server, check_dependencies): + """ + Verify that the API root endpoint returns operational status. + + Tests: + - GET / returns status 200 + - Response contains "status": "operational" + - Response contains valid API information + """ + resp = api_client.get(f"{api_url}/") + + assert resp.status_code == 200, "Root endpoint should return 200" + + data = resp.json() + assert data.get("status") == "operational", "API status should be 'operational'" + assert "name" in data, "Response should contain API name" + assert "message" in data, "Response should contain status message" + assert data.get("name") == "Base Deployment Controller", "API name should be 'Base Deployment Controller'" diff --git a/tests/test_api_integration.py b/tests/test_api_integration.py deleted file mode 100644 index 3374efa..0000000 --- a/tests/test_api_integration.py +++ /dev/null @@ -1,191 +0,0 @@ -""" -Integration tests for base-deployment-controller API. - -Covers: -- Deployment lifecycle (up/down) -- Container management and restart with started_at validation -- Environment variables CRUD -- WebSocket logs -""" -import time -from datetime import datetime, timezone -import asyncio -import pytest -import websockets - - -class TestDeploymentEndpoints: - """Deployment endpoints.""" - - def test_ping(self, api_client, api_url, check_dependencies): - resp = api_client.get(f"{api_url}/ping") - assert resp.status_code == 200 - data = resp.json() - assert data.get("success") is True - assert "message" in data - - def test_deployment_info(self, api_client, api_url): - resp = api_client.get(f"{api_url}/") - assert resp.status_code == 200 - data = resp.json() - assert "status" in data - assert "metadata" in data - assert "env_vars" in data - - def test_deployment_up(self, api_client, api_url): - resp = api_client.post(f"{api_url}/up") - assert resp.status_code == 200 - data = resp.json() - assert data.get("success") is True - # Small delay for startup - time.sleep(2) - - -class TestContainerEndpoints: - """Container management endpoints.""" - - def test_containers_list(self, api_client, api_url): - resp = api_client.get(f"{api_url}/containers") - assert resp.status_code == 200 - data = resp.json() - assert "containers" in data - assert len(data.get("containers", [])) > 0 - - def test_container_restart_with_verification(self, api_client, api_url): - container_name = "mongo" - # Pre-condition: must be running - pre = api_client.get(f"{api_url}/containers") - assert pre.status_code == 200 - pre_data = pre.json() - containers = pre_data.get("containers", []) - mongo = next((c for c in containers if c.get("name") == container_name), None) - assert mongo is not None, "Container 'mongo' not found" - assert mongo.get("status") == "running" - - # Perform restart - resp = api_client.post( - f"{api_url}/containers/{container_name}/restart", - json={"action": "restart"} - ) - assert resp.status_code == 200 - data = resp.json() - assert data.get("success") is True - - # Post-condition: wait for running status with recent started_at - max_wait = 20 - start = time.time() - new_mongo = None - while time.time() - start < max_wait: - post = api_client.get(f"{api_url}/containers") - if post.status_code == 200: - post_data = post.json() - post_containers = post_data.get("containers", []) - new_mongo = next((c for c in post_containers if c.get("name") == container_name), None) - if new_mongo and new_mongo.get("status") == "running" and new_mongo.get("started_at"): - break - time.sleep(0.5) - - assert new_mongo is not None, "mongo container not available after restart" - assert new_mongo.get("status") == "running" - started_at_str = new_mongo.get("started_at") - assert started_at_str, "started_at missing" - if started_at_str.endswith("Z"): - started_at_str = started_at_str[:-1] + "+00:00" - started_at = datetime.fromisoformat(started_at_str) - delta = (datetime.now(timezone.utc) - started_at).total_seconds() - assert delta >= 0 - assert delta <= 15, f"started_at too old ({delta:.2f}s)" - - -class TestEnvironmentVariables: - """Environment variables endpoints.""" - - def test_environment_variable_crud_cycle(self, api_client, api_url): - """ - Complete GET/PUT cycle test for environment variables: - - Initial GET to retrieve MCC value - - PUT with opposite value - - GET to verify change - - PUT to restore original value - - Final GET to verify restoration - """ - # PHASE 1: Initial GET - retrieve current MCC value - resp_initial = api_client.get(f"{api_url}/envs") - assert resp_initial.status_code == 200 - data_initial = resp_initial.json() - vars_initial = data_initial.get("variables", []) - mcc_initial = next((v for v in vars_initial if v.get("name") == "MCC"), None) - assert mcc_initial is not None - original_value = mcc_initial.get("value") - assert original_value in ["001", "214"], f"MCC must be 001 or 214, got: {original_value}" - - # PHASE 2: PUT with opposite value - new_value = "214" if original_value == "001" else "001" - payload_update = {"variables": {"MCC": new_value}, "restart_services": False} - resp_update = api_client.put(f"{api_url}/envs", json=payload_update) - assert resp_update.status_code == 200 - data_update = resp_update.json() - assert data_update.get("success") is True - assert "MCC" in data_update.get("updated", []) - assert data_update.get("restarted_services") == {} - - # PHASE 3: GET to verify the change - resp_after_change = api_client.get(f"{api_url}/envs") - assert resp_after_change.status_code == 200 - data_after_change = resp_after_change.json() - vars_after_change = data_after_change.get("variables", []) - mcc_after_change = next((v for v in vars_after_change if v.get("name") == "MCC"), None) - assert mcc_after_change is not None - assert mcc_after_change.get("value") == new_value, \ - f"MCC must be {new_value}, got: {mcc_after_change.get('value')}" - - # PHASE 4: PUT to restore original value - payload_restore = {"variables": {"MCC": original_value}} - resp_restore = api_client.put(f"{api_url}/envs", json=payload_restore) - assert resp_restore.status_code == 200 - data_restore = resp_restore.json() - assert data_restore.get("success") is True - assert "MCC" in data_restore.get("updated", []) - assert data_restore.get("restarted_services") is not None - time.sleep(2) # Wait for service restart when restart_services defaults to true - - # PHASE 5: Final GET to verify restoration - resp_final = api_client.get(f"{api_url}/envs") - assert resp_final.status_code == 200 - data_final = resp_final.json() - vars_final = data_final.get("variables", []) - mcc_final = next((v for v in vars_final if v.get("name") == "MCC"), None) - assert mcc_final is not None - assert mcc_final.get("value") == original_value, \ - f"MCC must be restored to {original_value}, got: {mcc_final.get('value')}" - - -class TestWebSocket: - """WebSocket log streaming.""" - - async def test_websocket_logs(self, api_url): - ws_url = api_url.replace("http", "ws") + "/containers/mongo/logs" - async with websockets.connect(ws_url) as websocket: - try: - msg = await asyncio.wait_for(websocket.recv(), timeout=5.0) - assert len(msg) > 0 - except asyncio.TimeoutError: - # No recent logs is also valid - pass - - -class TestDeploymentLifecycle: - """Complete shutdown/ping lifecycle.""" - - def test_deployment_down(self, api_client, api_url): - resp = api_client.post(f"{api_url}/down") - assert resp.status_code == 200 - data = resp.json() - assert data.get("success") is True - time.sleep(2) - - def test_ping_after_shutdown(self, api_client, api_url): - resp = api_client.get(f"{api_url}/ping") - assert resp.status_code == 200 - data = resp.json() - assert data.get("success") is True diff --git a/tests/test_containers.py b/tests/test_containers.py new file mode 100644 index 0000000..aff9f7d --- /dev/null +++ b/tests/test_containers.py @@ -0,0 +1,119 @@ +""" +Test cases for the Container endpoints (/containers). + +Tests container-level operations: +- Get container list +- Container lifecycle with SSE monitoring +- Individual container actions (stop, start, restart) +""" +import time +import pytest +from tests.utils import stream_task_updates + + +class TestContainerEndpoints: + """Container endpoint tests with SSE monitoring.""" + + def test_containers_list(self, api_client, api_url, api_server, check_dependencies): + """Test GET /containers returns container list.""" + print("\n=== CONTAINERS LIST TEST ===\n") + + # Deploy first + print("Deploying stack...") + resp_up = api_client.post(f"{api_url}/deployment/up") + assert resp_up.status_code == 202 + + task_id = resp_up.json()["task_id"] + final_state = stream_task_updates(api_url, task_id, f"/deployment/tasks/{task_id}/stream", timeout=120) + + # Wait for deployment to be ready + time.sleep(2) + + # Get containers list + print("\nGetting containers list...") + resp_list = api_client.get(f"{api_url}/containers") + + assert resp_list.status_code == 200, "GET /containers should return 200" + data = resp_list.json() + + # Validate response structure + assert isinstance(data, dict), "Response should be a dict" + assert "containers" in data, "Response should contain 'containers'" + + containers = data["containers"] + assert isinstance(containers, list), "Containers should be a list" + assert len(containers) > 0, "Should have at least one container running" + + # Validate each container has required fields + for container in containers: + assert "name" in container + assert "status" in container + assert "ports" in container or container["ports"] is None + + print(f"✓ Found {len(containers)} containers") + + # Cleanup + print("\nCleaning up (compose kill)...") + resp_kill = api_client.post(f"{api_url}/deployment/kill") + assert resp_kill.status_code == 202 + + task_id_kill = resp_kill.json()["task_id"] + stream_task_updates(api_url, task_id_kill, f"/deployment/tasks/{task_id_kill}/stream", timeout=120) + + def test_container_individual_action_with_sse(self, api_client, api_url, api_server, check_dependencies): + """ + Test individual container actions (stop/start) with SSE monitoring. + + Sequence: + 1. Deploy UP and wait for all containers running + 2. Stop specific container (MME) with SSE monitoring + 3. Verify container state changed + 4. Deploy KILL + """ + print("\n=== CONTAINER INDIVIDUAL ACTION TEST ===\n") + + # PHASE 1: Deploy UP + print("PHASE 1: Deploying stack...") + resp_up = api_client.post(f"{api_url}/deployment/up") + assert resp_up.status_code == 202 + + task_id_up = resp_up.json()["task_id"] + print(f"Monitoring deployment (task {task_id_up[:8]}...)...") + final_state_up = stream_task_updates( + api_url, task_id_up, f"/deployment/tasks/{task_id_up}/stream", timeout=120 + ) + assert final_state_up.get("task_status") == "completed" + + time.sleep(2) + + # Get container list to identify target + resp_list = api_client.get(f"{api_url}/containers") + containers = resp_list.json().get("containers", []) + print(f"Active containers: {[c['name'] for c in containers]}") + + # Find and stop MME container + mme_container = next((c for c in containers if "mme" in c["name"].lower()), None) + if mme_container: + container_name = mme_container["name"] + print(f"\nPHASE 2: Stopping container {container_name}...") + + resp_stop = api_client.post(f"{api_url}/containers/{container_name}/stop") + assert resp_stop.status_code == 202, "Stop should return 202 Accepted" + + task_id_stop = resp_stop.json()["task_id"] + print(f"Monitoring stop action (task {task_id_stop[:8]}...)...") + + final_state_stop = stream_task_updates( + api_url, task_id_stop, f"/containers/{container_name}/tasks/{task_id_stop}/stream", timeout=60 + ) + assert final_state_stop.get("task_status") == "completed" + + # PHASE 3: Deploy KILL + print("\nPHASE 3: Cleaning up (compose kill)...") + resp_kill = api_client.post(f"{api_url}/deployment/kill") + assert resp_kill.status_code == 202 + + task_id_kill = resp_kill.json()["task_id"] + stream_task_updates(api_url, task_id_kill, f"/deployment/tasks/{task_id_kill}/stream", timeout=120) + + print("\n✓ Container action test passed") diff --git a/tests/test_deployment.py b/tests/test_deployment.py new file mode 100644 index 0000000..3d83293 --- /dev/null +++ b/tests/test_deployment.py @@ -0,0 +1,76 @@ +""" +Test cases for the Deployment endpoints (/deployment). + +Tests the full deployment lifecycle: +- Deploy UP with real-time SSE monitoring +- Deploy DOWN with real-time SSE monitoring +""" +import time +from tests.utils import stream_task_updates + + +class TestDeploymentEndpoints: + """Deployment endpoint tests with SSE monitoring.""" + + def test_deployment_lifecycle_with_sse_monitoring(self, api_client, api_url, api_server, check_dependencies): + """ + Test complete deployment lifecycle with real-time SSE monitoring. + + Sequence: + 1. POST /deployment/up to start deployment + 2. Monitor SSE stream to track container state transitions + 3. Verify all containers reach STARTED state + 4. POST /deployment/down to down deployment + 5. Monitor SSE stream for shutdown progression + 6. Verify all containers reach REMOVED state + """ + print("\n=== DEPLOYMENT LIFECYCLE TEST ===\n") + + # PHASE 1: Start deployment + print("PHASE 1: Starting deployment (compose up)...") + resp_up = api_client.post(f"{api_url}/deployment/up") + + assert resp_up.status_code == 202, "UP should return 202 Accepted" + data_up = resp_up.json() + assert "task_id" in data_up, "Response should contain task_id" + assert data_up.get("status") == "running", "Initial status should be 'running'" + + task_id_up = data_up["task_id"] + sse_endpoint_up = f"/deployment/tasks/{task_id_up}/stream" + + # Stream and monitor UP progress + print(f"Monitoring UP task {task_id_up[:8]}... via SSE...") + final_state_up = stream_task_updates( + api_url, task_id_up, sse_endpoint_up, timeout=120 + ) + + # Verify all containers are in STARTED state + print(f"Final UP task: {final_state_up}") + assert final_state_up.get("task_status") == "completed", "UP task should be completed" + assert final_state_up.get("operation") == "up" + + time.sleep(2) # Give containers time to settle + + # PHASE 2: Stop deployment + print("\nPHASE 2: Stopping deployment (compose down)...") + resp_down = api_client.post(f"{api_url}/deployment/down") + + assert resp_down.status_code == 202, "DOWN should return 202 Accepted" + data_down = resp_down.json() + assert "task_id" in data_down, "Response should contain task_id" + + task_id_down = data_down["task_id"] + sse_endpoint_down = f"/deployment/tasks/{task_id_down}/stream" + + # Stream and monitor DOWN progress + print(f"Monitoring DOWN task {task_id_down[:8]}... via SSE...") + final_state_down = stream_task_updates( + api_url, task_id_down, sse_endpoint_down, timeout=120 + ) + + # Verify all containers are in REMOVED state + print(f"Final DOWN task: {final_state_down}") + assert final_state_down.get("task_status") == "completed", "DOWN task should be completed" + assert final_state_down.get("operation") == "down" + + print("\n✓ Deployment lifecycle test passed") diff --git a/tests/test_envs.py b/tests/test_envs.py new file mode 100644 index 0000000..a86616c --- /dev/null +++ b/tests/test_envs.py @@ -0,0 +1,91 @@ +""" +Test cases for the Environment endpoints (/envs). + +Tests environment variable management: +- Update environment variables +- Verify service restart via SSE monitoring +- Confirm completion before validation +""" +import time +import pytest +from tests.utils import stream_task_updates + + +class TestEnvironmentEndpoints: + """Environment endpoints tests with SSE monitoring.""" + + def test_env_update_with_sse_completion(self, api_client, api_url, api_server, check_dependencies): + """ + Test environment variable update with SSE completion monitoring. + + Sequence: + 1. Deploy UP and wait for all containers running + 2. Update environment variable (MCC to 214) + 3. Monitor SSE stream for service restart completion + 4. Verify update was applied by checking environment + 5. Deploy KILL to clean up + """ + print("\n=== ENVIRONMENT UPDATE TEST ===\n") + + # PHASE 1: Deploy UP + print("PHASE 1: Deploying stack...") + resp_up = api_client.post(f"{api_url}/deployment/up") + assert resp_up.status_code == 202 + + task_id_up = resp_up.json()["task_id"] + print(f"Monitoring deployment (task {task_id_up[:8]}...)...") + final_state_up = stream_task_updates( + api_url, task_id_up, f"/deployment/tasks/{task_id_up}/stream", timeout=120 + ) + assert final_state_up.get("task_status") == "completed" + + time.sleep(2) + + # PHASE 2: Update environment variable + print("\nPHASE 2: Updating environment variable (MCC=214)...") + + update_payload = { + "variables": { + "MCC": "214" + } + } + + resp_update = api_client.put(f"{api_url}/envs", json=update_payload) + assert resp_update.status_code == 202, "PUT /envs should return 202 Accepted" + + data_update = resp_update.json() + assert "task_id" in data_update, "Response should contain task_id" + + task_id_update = data_update["task_id"] + print(f"Monitoring environment update (task {task_id_update[:8]}...)...") + + # Monitor the environment update with SSE + final_state_update = stream_task_updates( + api_url, task_id_update, f"/envs/tasks/{task_id_update}/stream", timeout=120 + ) + assert final_state_update.get("task_status") == "completed", "Environment update should be completed" + + print("✓ Environment update completed successfully via SSE") + + time.sleep(1) + + # PHASE 3: Verify environment was updated + print("\nPHASE 3: Verifying environment update...") + resp_get_env = api_client.get(f"{api_url}/envs") + + if resp_get_env.status_code == 200: + env_data = resp_get_env.json() + if "environment" in env_data: + mcc_value = env_data["environment"].get("MCC") + print(f"MCC environment variable: {mcc_value}") + # Note: Actual verification would depend on API returning current env state + + # PHASE 4: Deploy KILL to clean up + print("\nPHASE 4: Cleaning up (compose kill)...") + resp_kill = api_client.post(f"{api_url}/deployment/kill") + assert resp_kill.status_code == 202 + + task_id_kill = resp_kill.json()["task_id"] + stream_task_updates(api_url, task_id_kill, f"/deployment/tasks/{task_id_kill}/stream", timeout=120) + + print("\n✓ Environment update test completed successfully") diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 0000000..13b1d62 --- /dev/null +++ b/tests/utils.py @@ -0,0 +1,65 @@ +""" +Test helper utilities. +""" +from typing import Dict, Any, Optional +import time +import json +import requests + + +def stream_task_updates( + api_url: str, + task_id: str, + endpoint: str, + timeout: int = 120, +) -> Dict[str, Any]: + """ + Stream SSE task updates and return final task state. + + Args: + api_url: Base API URL + task_id: Task ID to monitor + endpoint: Endpoint to stream from (e.g., "/deployment/tasks/{task_id}/stream") + timeout: Maximum wait time in seconds + + Returns: + Final task state dictionary + """ + session = requests.Session() + start_time = time.time() + last_state = None + + try: + response = session.get(f"{api_url}{endpoint}", stream=True, timeout=timeout) + response.raise_for_status() + + for line in response.iter_lines(): + if time.time() - start_time > timeout: + raise TimeoutError(f"Task monitoring exceeded {timeout}s timeout") + + if not line or line.startswith(b':'): + continue + + # Parse SSE format: "data: {json}" + if line.startswith(b'data:'): + json_str = line[5:].strip() + try: + last_state = json.loads(json_str) + # Print real-time updates + if "task_status" in last_state: + print(f" Task {task_id[:8]}... - Status: {last_state['task_status']}") + + except json.JSONDecodeError: + pass + + # Check for completion + if line.startswith(b'event: done'): + break + + except Exception as e: + print(f"Error streaming task updates: {e}") + raise + finally: + session.close() + + return last_state or {}