diff --git a/src/itential_mcp/platform/services/__init__.py b/src/itential_mcp/platform/services/__init__.py index 16905b0..5c6efcc 100644 --- a/src/itential_mcp/platform/services/__init__.py +++ b/src/itential_mcp/platform/services/__init__.py @@ -2,10 +2,12 @@ # GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later +from typing import Any + from ipsdk.platform import AsyncPlatform -class ServiceBase(object): +class ServiceBase: """Abstract base class for Itential Platform service implementations. ServiceBase provides a common interface and foundation for all service @@ -13,6 +15,9 @@ class ServiceBase(object): for encapsulating specific API operations and providing a clean interface for tool implementations to consume platform resources. + This base class provides common utilities such as pagination helpers that + reduce code duplication across service implementations. + Args: client: An AsyncPlatform client instance for communicating with the Itential Platform API @@ -21,5 +26,86 @@ class ServiceBase(object): client (AsyncPlatform): The platform client used for API communication """ + __slots__ = ("client",) + def __init__(self, client: AsyncPlatform): self.client = client + + async def _paginate( + self, + endpoint: str, + *, + params: dict[str, Any] | None = None, + limit: int = 100, + data_key: str = "results", + total_key: str = "total", + metadata_key: str | None = None, + ) -> list[dict[str, Any]]: + """Generic pagination helper for API endpoints. + + This helper method handles paginated retrieval from Itential Platform APIs + by making sequential requests with skip/limit parameters until all results + are retrieved. It supports different response formats through configurable + keys for data extraction. + + Args: + endpoint: API endpoint path to paginate (e.g., "/api/resources") + params: Optional query parameters to include in requests. The method + will automatically add 'limit' and 'skip' parameters. Defaults to None. + limit: Page size for each request (default: 100). Higher values reduce + the number of API calls but may impact response time. + data_key: Key in response containing data items (default: "results"). + Common values: "results", "data", "items", "list" + total_key: Key for total count in response (default: "total"). + Used to determine when pagination is complete. + metadata_key: Optional parent key for total count (default: None). + When provided, total is extracted from response[metadata_key][total_key]. + Common values: "metadata", "meta" + + Returns: + list[dict[str, Any]]: Complete list of all paginated results combined + from all API requests + + Example: + # Simple pagination with defaults + results = await self._paginate("/api/resources") + + # With parameters and custom keys + results = await self._paginate( + "/api/resources", + params={"filter": "active"}, + data_key="data", + total_key="total", + metadata_key="metadata" + ) + + # For POST-based pagination + # (Note: This method uses GET; services should override for POST) + """ + skip = 0 + params = params or {} + params["limit"] = limit + results = [] + + while True: + params["skip"] = skip + + res = await self.client.get(endpoint, params=params) + data = res.json() + + # Extract items from response + items = data.get(data_key, []) + results.extend(items) + + # Extract total count (handle nested keys) + if metadata_key: + total = data.get(metadata_key, {}).get(total_key, 0) + else: + total = data.get(total_key, 0) + + if len(results) >= total: + break + + skip += limit + + return results diff --git a/src/itential_mcp/platform/services/_enums.py b/src/itential_mcp/platform/services/_enums.py new file mode 100644 index 0000000..040470b --- /dev/null +++ b/src/itential_mcp/platform/services/_enums.py @@ -0,0 +1,42 @@ +# Copyright (c) 2025 Itential, Inc +# GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt) +# SPDX-License-Identifier: GPL-3.0-or-later + +"""Enumerations for Itential Platform service states and types. + +This module provides type-safe enumerations for various states and types +used across the platform services, reducing the risk of typos and improving +code maintainability. +""" + +from enum import Enum + + +class AdapterState(str, Enum): + """Valid adapter states in Itential Platform. + + Adapters can be in one of these states during their lifecycle. + These states represent the operational status of an adapter instance. + """ + + RUNNING = "RUNNING" + STOPPED = "STOPPED" + DEAD = "DEAD" + DELETED = "DELETED" + STARTING = "STARTING" + STOPPING = "STOPPING" + + +class ApplicationState(str, Enum): + """Valid application states in Itential Platform. + + Applications can be in one of these states during their lifecycle. + These states represent the operational status of an application instance. + """ + + RUNNING = "RUNNING" + STOPPED = "STOPPED" + DEAD = "DEAD" + DELETED = "DELETED" + STARTING = "STARTING" + STOPPING = "STOPPING" diff --git a/src/itential_mcp/platform/services/adapters.py b/src/itential_mcp/platform/services/adapters.py index 64c0e74..c23dc86 100644 --- a/src/itential_mcp/platform/services/adapters.py +++ b/src/itential_mcp/platform/services/adapters.py @@ -5,8 +5,8 @@ import asyncio from itential_mcp.core import exceptions - from itential_mcp.platform.services import ServiceBase +from itential_mcp.platform.services._enums import AdapterState class Service(ServiceBase): @@ -27,7 +27,7 @@ class Service(ServiceBase): name: str = "adapters" - async def _get_adapter_health(self, name): + async def _get_adapter_health(self, name: str) -> dict: """ Retrieve health information for a specific adapter from Itential Platform. @@ -63,9 +63,30 @@ async def _get_adapter_health(self, name): return data - async def start_adapter(self, name, timeout): + async def _poll_for_state( + self, name: str, target_state: str, interval: float = 1.0 + ) -> dict: + """Poll adapter health until target state is reached. + + Args: + name: Adapter name to poll + target_state: The desired adapter state to wait for + interval: Polling interval in seconds (default: 1.0) + + Returns: + Final adapter health data when target state is reached """ - Start an adapter on Itential Platform with state validation and timeout. + while True: + data = await self._get_adapter_health(name) + state = data["results"][0]["state"] + + if state == target_state: + return data["results"][0] + + await asyncio.sleep(interval) + + async def start_adapter(self, name: str, timeout: int) -> dict: + """Start an adapter on Itential Platform with state validation and timeout. This method manages the complete lifecycle of starting an adapter, including initial state validation, issuing start commands, and polling @@ -73,13 +94,13 @@ async def start_adapter(self, name, timeout): and provides appropriate error handling. Args: - name (str): Case-sensitive adapter name to start. Must match an + name: Case-sensitive adapter name to start. Must match an existing adapter configuration on the platform - timeout (int): Maximum seconds to wait for the adapter to reach - RUNNING state. Countdown decreases by 1 each second during polling + timeout: Maximum seconds to wait for the adapter to reach + RUNNING state Returns: - StartAdapterResponse: Response model containing: + dict: Response containing adapter health data with: - name: The adapter name that was started - state: Final operational state after the operation @@ -97,32 +118,34 @@ async def start_adapter(self, name, timeout): - Polling occurs every 1 second until timeout or success """ data = await self._get_adapter_health(name) - state = data["results"][0]["state"] - - if state == "STOPPED": - await self.client.put(f"/adapters/{name}/start") - - while timeout: - data = await self._get_adapter_health(name) - state = data["results"][0]["state"] + state = AdapterState(data["results"][0]["state"]) - if state == "RUNNING": - break + if state == AdapterState.RUNNING: + return data["results"][0] - await asyncio.sleep(1) - timeout -= 1 + if state in (AdapterState.DEAD, AdapterState.DELETED): + raise exceptions.InvalidStateError( + f"adapter '{name}' is in {state.value} state and cannot be started" + ) - elif state in ("DEAD", "DELETED"): - raise exceptions.InvalidStateError(f"adapter `{name}` is `{state}`") + if state == AdapterState.STOPPED: + await self.client.put(f"/adapters/{name}/start") - if timeout == 0: - raise exceptions.TimeoutExceededError() + try: + result = await asyncio.wait_for( + self._poll_for_state(name, AdapterState.RUNNING.value), + timeout=timeout, + ) + return result + except asyncio.TimeoutError: + raise exceptions.TimeoutExceededError( + f"adapter '{name}' did not reach RUNNING state within {timeout}s" + ) return data["results"][0] - async def stop_adapter(self, name, timeout): - """ - Stop an adapter on Itential Platform with state validation and timeout. + async def stop_adapter(self, name: str, timeout: int) -> dict: + """Stop an adapter on Itential Platform with state validation and timeout. This method manages the complete lifecycle of stopping an adapter, including initial state validation, issuing stop commands, and polling @@ -130,13 +153,13 @@ async def stop_adapter(self, name, timeout): and provides appropriate error handling. Args: - name (str): Case-sensitive adapter name to stop. Must match an + name: Case-sensitive adapter name to stop. Must match an existing adapter configuration on the platform - timeout (int): Maximum seconds to wait for the adapter to reach - STOPPED state. Countdown decreases by 1 each second during polling + timeout: Maximum seconds to wait for the adapter to reach + STOPPED state Returns: - StopAdapterResponse: Response model containing: + dict: Response containing adapter health data with: - name: The adapter name that was stopped - state: Final operational state after the operation @@ -154,45 +177,47 @@ async def stop_adapter(self, name, timeout): - Polling occurs every 1 second until timeout or success """ data = await self._get_adapter_health(name) - state = data["results"][0]["state"] - - if state == "RUNNING": - await self.client.put(f"/adapters/{name}/stop") + state = AdapterState(data["results"][0]["state"]) - while timeout: - data = await self._get_adapter_health(name) - state = data["results"][0]["state"] + if state == AdapterState.STOPPED: + return data["results"][0] - if state == "STOPPED": - break + if state in (AdapterState.DEAD, AdapterState.DELETED): + raise exceptions.InvalidStateError( + f"adapter '{name}' is in {state.value} state and cannot be stopped" + ) - await asyncio.sleep(1) - timeout -= 1 - - elif state in ("DEAD", "DELETED"): - raise exceptions.InvalidStateError(f"adapter `{name}` is `{state}`") + if state == AdapterState.RUNNING: + await self.client.put(f"/adapters/{name}/stop") - if timeout == 0: - raise exceptions.TimeoutExceededError() + try: + result = await asyncio.wait_for( + self._poll_for_state(name, AdapterState.STOPPED.value), + timeout=timeout, + ) + return result + except asyncio.TimeoutError: + raise exceptions.TimeoutExceededError( + f"adapter '{name}' did not reach STOPPED state within {timeout}s" + ) return data["results"][0] - async def restart_adapter(self, name, timeout): - """ - Restart an adapter on Itential Platform with state validation and timeout. + async def restart_adapter(self, name: str, timeout: int) -> dict: + """Restart an adapter on Itential Platform with state validation and timeout. This method manages the complete lifecycle of restarting an adapter, including initial state validation, issuing restart commands, and polling for successful state transitions. Only RUNNING adapters can be restarted. Args: - name (str): Case-sensitive adapter name to restart. Must match an + name: Case-sensitive adapter name to restart. Must match an existing adapter configuration on the platform - timeout (int): Maximum seconds to wait for the adapter to return to - RUNNING state. Countdown decreases by 1 each second during polling + timeout: Maximum seconds to wait for the adapter to return to + RUNNING state Returns: - RestartAdapterResponse: Response model containing: + dict: Response containing adapter health data with: - name: The adapter name that was restarted - state: Final operational state after the operation (should be RUNNING) @@ -210,25 +235,25 @@ async def restart_adapter(self, name, timeout): - Polling occurs every 1 second until timeout or success """ data = await self._get_adapter_health(name) - state = data["results"][0]["state"] + state = AdapterState(data["results"][0]["state"]) - if state == "RUNNING": - await self.client.put(f"/adapters/{name}/restart") - - while timeout: - data = await self._get_adapter_health(name) - state = data["results"][0]["state"] - - if state == "RUNNING": - break + if state in (AdapterState.DEAD, AdapterState.DELETED, AdapterState.STOPPED): + raise exceptions.InvalidStateError( + f"adapter '{name}' is in {state.value} state and cannot be restarted" + ) - await asyncio.sleep(1) - timeout -= 1 - - elif state in ("DEAD", "DELETED", "STOPPED"): - raise exceptions.InvalidStateError(f"adapter `{name}` is `{state}`") + if state == AdapterState.RUNNING: + await self.client.put(f"/adapters/{name}/restart") - if timeout == 0: - raise exceptions.TimeoutExceededError() + try: + result = await asyncio.wait_for( + self._poll_for_state(name, AdapterState.RUNNING.value), + timeout=timeout, + ) + return result + except asyncio.TimeoutError: + raise exceptions.TimeoutExceededError( + f"adapter '{name}' did not return to RUNNING state within {timeout}s" + ) return data["results"][0] diff --git a/src/itential_mcp/platform/services/applications.py b/src/itential_mcp/platform/services/applications.py index aece235..63d6548 100644 --- a/src/itential_mcp/platform/services/applications.py +++ b/src/itential_mcp/platform/services/applications.py @@ -6,6 +6,7 @@ from itential_mcp.core import exceptions from itential_mcp.platform.services import ServiceBase +from itential_mcp.platform.services._enums import ApplicationState class Service(ServiceBase): @@ -42,6 +43,28 @@ async def _get_application_health(self, name: str) -> dict: return data["results"][0] + async def _poll_for_state( + self, name: str, target_state: str, interval: float = 1.0 + ) -> dict: + """Poll application health until target state is reached. + + Args: + name: Application name to poll + target_state: The desired application state to wait for + interval: Polling interval in seconds (default: 1.0) + + Returns: + Final application health data when target state is reached + """ + while True: + data = await self._get_application_health(name) + state = data["state"] + + if state == target_state: + return data + + await asyncio.sleep(interval) + async def start_application(self, name: str, timeout: int) -> dict: """Start an application on the Itential Platform. @@ -49,42 +72,45 @@ async def start_application(self, name: str, timeout: int) -> dict: Waits for the application to reach a RUNNING state within the timeout period. Args: - name (str): The name of the application to start. - timeout (int): Maximum time in seconds to wait for the application - to reach RUNNING state. + name: The name of the application to start + timeout: Maximum time in seconds to wait for the application + to reach RUNNING state Returns: - dict: Application health data after the operation completes. + dict: Application health data after the operation completes Raises: exceptions.InvalidStateError: If the application is in DEAD or DELETED - state and cannot be started. + state and cannot be started exceptions.TimeoutExceededError: If the application does not reach - RUNNING state within the specified timeout period. + RUNNING state within the specified timeout period exceptions.NotFoundError: If the application with the specified name - cannot be found on the platform. + cannot be found on the platform """ data = await self._get_application_health(name) - state = data["results"][0]["state"] - - if state == "STOPPED": - await self.client.put(f"/applications/{name}/start") - - while timeout: - data = await self._get_application_health(name) - state = data["results"][0]["state"] + state = ApplicationState(data["state"]) - if state == "RUNNING": - break + if state == ApplicationState.RUNNING: + return data - await asyncio.sleep(1) - timeout -= 1 + if state in (ApplicationState.DEAD, ApplicationState.DELETED): + raise exceptions.InvalidStateError( + f"application '{name}' is in {state.value} state and cannot be started" + ) - elif state in ("DEAD", "DELETED"): - raise exceptions.InvalidStateError(f"application `{name}` is `{state}`") + if state == ApplicationState.STOPPED: + await self.client.put(f"/applications/{name}/start") - if timeout == 0: - raise exceptions.TimeoutExceededError() + try: + result = await asyncio.wait_for( + self._poll_for_state(name, ApplicationState.RUNNING.value), + timeout=timeout, + ) + return result + except asyncio.TimeoutError: + raise exceptions.TimeoutExceededError( + f"application '{name}' did not reach RUNNING state within {timeout}s" + ) return data @@ -95,43 +121,45 @@ async def stop_application(self, name: str, timeout: int) -> dict: Waits for the application to reach a STOPPED state within the timeout period. Args: - name (str): The name of the application to stop. - timeout (int): Maximum time in seconds to wait for the application - to reach STOPPED state. + name: The name of the application to stop + timeout: Maximum time in seconds to wait for the application + to reach STOPPED state Returns: - dict: Application health data after the operation completes. + dict: Application health data after the operation completes Raises: exceptions.InvalidStateError: If the application is in DEAD or DELETED - state and cannot be stopped. + state and cannot be stopped exceptions.TimeoutExceededError: If the application does not reach - STOPPED state within the specified timeout period. + STOPPED state within the specified timeout period exceptions.NotFoundError: If the application with the specified name - cannot be found on the platform. + cannot be found on the platform """ data = await self._get_application_health(name) - state = data["results"][0]["state"] - - if state == "RUNNING": - await self.client.put(f"/applications/{name}/stop") - - while timeout: - data = await self._get_application_health(name) + state = ApplicationState(data["state"]) - state = data["results"][0]["state"] + if state == ApplicationState.STOPPED: + return data - if state == "STOPPED": - break + if state in (ApplicationState.DEAD, ApplicationState.DELETED): + raise exceptions.InvalidStateError( + f"application '{name}' is in {state.value} state and cannot be stopped" + ) - await asyncio.sleep(1) - timeout -= 1 - - elif state in ("DEAD", "DELETED"): - raise exceptions.InvalidStateError(f"application `{name}` is `{state}`") + if state == ApplicationState.RUNNING: + await self.client.put(f"/applications/{name}/stop") - if timeout == 0: - raise exceptions.TimeoutExceededError() + try: + result = await asyncio.wait_for( + self._poll_for_state(name, ApplicationState.STOPPED.value), + timeout=timeout, + ) + return result + except asyncio.TimeoutError: + raise exceptions.TimeoutExceededError( + f"application '{name}' did not reach STOPPED state within {timeout}s" + ) return data @@ -142,42 +170,45 @@ async def restart_application(self, name: str, timeout: int) -> dict: Waits for the application to return to RUNNING state within the timeout period. Args: - name (str): The name of the application to restart. - timeout (int): Maximum time in seconds to wait for the application - to return to RUNNING state after restart. + name: The name of the application to restart + timeout: Maximum time in seconds to wait for the application + to return to RUNNING state after restart Returns: - dict: Application health data after the operation completes. + dict: Application health data after the operation completes Raises: exceptions.InvalidStateError: If the application is in DEAD, DELETED, - or STOPPED state and cannot be restarted. + or STOPPED state and cannot be restarted exceptions.TimeoutExceededError: If the application does not return to - RUNNING state within the specified timeout period. + RUNNING state within the specified timeout period exceptions.NotFoundError: If the application with the specified name - cannot be found on the platform. + cannot be found on the platform """ data = await self._get_application_health(name) - state = data["results"][0]["state"] - - if state == "RUNNING": + state = ApplicationState(data["state"]) + + if state in ( + ApplicationState.DEAD, + ApplicationState.DELETED, + ApplicationState.STOPPED, + ): + raise exceptions.InvalidStateError( + f"application '{name}' is in {state.value} state and cannot be restarted" + ) + + if state == ApplicationState.RUNNING: await self.client.put(f"/applications/{name}/restart") - while timeout: - data = await self._get_application_health(name) - - state = data["results"][0]["state"] - - if state == "RUNNING": - break - - await asyncio.sleep(1) - timeout -= 1 - - elif state in ("DEAD", "DELETED", "STOPPED"): - raise exceptions.InvalidStateError(f"application `{name}` is `{state}`") - - if timeout == 0: - raise exceptions.TimeoutExceededError() + try: + result = await asyncio.wait_for( + self._poll_for_state(name, ApplicationState.RUNNING.value), + timeout=timeout, + ) + return result + except asyncio.TimeoutError: + raise exceptions.TimeoutExceededError( + f"application '{name}' did not return to RUNNING state within {timeout}s" + ) return data diff --git a/src/itential_mcp/platform/services/automation_studio.py b/src/itential_mcp/platform/services/automation_studio.py index b373c47..78c5bbd 100644 --- a/src/itential_mcp/platform/services/automation_studio.py +++ b/src/itential_mcp/platform/services/automation_studio.py @@ -232,8 +232,8 @@ async def describe_template( from the Automation Studio API. """ if project is not None: - p = await self.describe_project(project) - name = f"@{p['_id']}: {name}" + project_data = await self.describe_project(project) + name = f"@{project_data['_id']}: {name}" res = await self._get_templates(params={"equals[name]": name}) if len(res) != 1: raise exceptions.NotFoundError(f"template {name} could not found") @@ -388,31 +388,12 @@ async def get_projects(self) -> list[Mapping[str, Any]]: Exception: If there is an error retrieving projects from the Automation Studio API. """ - limit = 100 - skip = 0 - - params = {"limit": limit} - - results = list() - - while True: - params["skip"] = skip - - res = await self.client.get( - "/automation-studio/projects", - params=params, - ) - - data = res.json() - - results.extend(data["data"]) - - if len(results) == data["metadata"]["total"]: - break - - skip += limit - - return results + return await self._paginate( + "/automation-studio/projects", + data_key="data", + total_key="total", + metadata_key="metadata", + ) async def describe_project(self, name: str) -> dict: """Get detailed information about a specific Automation Studio project. diff --git a/src/itential_mcp/platform/services/configuration_manager.py b/src/itential_mcp/platform/services/configuration_manager.py index 0002543..b0a7ad6 100644 --- a/src/itential_mcp/platform/services/configuration_manager.py +++ b/src/itential_mcp/platform/services/configuration_manager.py @@ -274,12 +274,12 @@ async def add_golden_config_node( """ # Lookup tree id trees = await self.get_golden_config_trees() - for ele in trees: - if ele["name"] == tree_name: - tree_id = ele["id"] + for tree in trees: + if tree["name"] == tree_name: + tree_id = tree["id"] break else: - raise exceptions.NotFoundError(f"tree {tree_name} could not be found") + raise exceptions.NotFoundError(f"tree '{tree_name}' not found") try: res = await self.client.post( @@ -381,9 +381,9 @@ async def create_device_group( """ groups_response = await self.get_device_groups() - for ele in groups_response: - if ele["name"] == name: - raise ValueError(f"device group {name} already exists") + for group in groups_response: + if group["name"] == name: + raise ValueError(f"device group '{name}' already exists") body = { "groupName": name, @@ -450,12 +450,12 @@ async def remove_devices_from_group(self, name: str, devices: list) -> dict: device_group_id = data["id"] - device_groups = list() - for ele in data["devices"]: - if ele not in devices: - device_groups.append(ele) + remaining_devices = [] + for device in data["devices"]: + if device not in devices: + remaining_devices.append(device) - body = {"details": {"devices": device_groups}} + body = {"details": {"devices": remaining_devices}} res = await self.client.put( f"/configuration_manager/deviceGroups/{device_group_id}", json=body diff --git a/src/itential_mcp/platform/services/gateway_manager.py b/src/itential_mcp/platform/services/gateway_manager.py index 1513b0f..741cf4c 100644 --- a/src/itential_mcp/platform/services/gateway_manager.py +++ b/src/itential_mcp/platform/services/gateway_manager.py @@ -880,9 +880,7 @@ async def create_service_group( if metadata is not None: body["metadata"] = metadata - res = await self.client.post( - "/gateway_manager/v1/service-groups", json=body - ) + res = await self.client.post("/gateway_manager/v1/service-groups", json=body) return res.json() async def get_service_group(self, service_group_id: str) -> Mapping[str, Any]: diff --git a/src/itential_mcp/platform/services/integrations.py b/src/itential_mcp/platform/services/integrations.py index 9e8fa92..5e0f607 100644 --- a/src/itential_mcp/platform/services/integrations.py +++ b/src/itential_mcp/platform/services/integrations.py @@ -23,17 +23,16 @@ class Service(ServiceBase): name: str = "integrations" async def get_integrations(self, model: str | None = None) -> list[dict[str, Any]]: - """ - Get all integration instances from Itential Platform with optional filtering. + """Get all integration instances from Itential Platform with optional filtering. Integration instances are configured implementations of integration models that define connections to external systems. This method retrieves all instances or filters by a specific model type. Args: - model (str | None): Optional model name to filter results. If provided, + model: Optional model name to filter results. If provided, only returns integration instances associated with the specified model. - Defaults to None (returns all instances). + Defaults to None (returns all instances) Returns: list[dict[str, Any]]: List of integration instance dictionaries containing: @@ -45,34 +44,19 @@ async def get_integrations(self, model: str | None = None) -> list[dict[str, Any ConnectionException: If there is an error connecting to the platform AuthenticationException: If authentication credentials are invalid """ - limit = 100 - skip = 0 - - params = {"limit": limit} - + params = {} if model is not None: params.update({"containsField": "model", "contains": model}) - results = list() - - while True: - params["skip"] = skip - - res = await self.client.get( - "/integrations", - params=params, - ) - - data = res.json() - - results.extend([x["data"] for x in data["results"]]) - - if len(results) == data["total"]: - break - - skip += limit + results = await self._paginate( + "/integrations", + params=params, + data_key="results", + total_key="total", + ) - return results + # Extract data field from each result + return [item["data"] for item in results] async def get_integration_models(self) -> dict: """ diff --git a/src/itential_mcp/platform/services/mop.py b/src/itential_mcp/platform/services/mop.py index 68ba9eb..6b3b531 100644 --- a/src/itential_mcp/platform/services/mop.py +++ b/src/itential_mcp/platform/services/mop.py @@ -4,7 +4,7 @@ from __future__ import annotations -import time +from datetime import datetime, timezone from typing import Any from itential_mcp.platform.services import ServiceBase @@ -26,15 +26,22 @@ class Service(ServiceBase): name: str = "mop" - async def _get_project_id_from_name(self, name: str) -> str: + def _current_timestamp_ms(self) -> int: + """Get current UTC timestamp in milliseconds. + + Returns: + int: Current time in milliseconds since epoch """ - Get the project ID for a specified project name. + return int(datetime.now(timezone.utc).timestamp() * 1000) + + async def _get_project_id_from_name(self, name: str) -> str: + """Get the project ID for a specified project name. Args: - name (str): Case-sensitive project name to locate + name: Case-sensitive project name to locate Returns: - str: The project ID associated with the project name + The project ID associated with the project name Raises: ValueError: If the project name cannot be definitively located @@ -218,6 +225,7 @@ async def create_command_template( project_id = await self._get_project_id_from_name(project) template_name = f"@{project_id}: {name}" + current_time = self._current_timestamp_ms() body = { "mop": { "name": template_name, @@ -225,9 +233,9 @@ async def create_command_template( "passRule": pass_rule, "ignoreWarnings": ignore_warnings, "commands": commands, - "created": int(time.time() * 1000), # Current timestamp in milliseconds + "created": current_time, "createdBy": "system", # This should be replaced with actual user - "lastUpdated": int(time.time() * 1000), + "lastUpdated": current_time, "lastUpdatedBy": "system", } } @@ -294,7 +302,7 @@ async def update_command_template( "commands": commands, "created": existing_template.get("created"), "createdBy": existing_template.get("createdBy"), - "lastUpdated": int(time.time() * 1000), + "lastUpdated": self._current_timestamp_ms(), "lastUpdatedBy": "system", } } diff --git a/src/itential_mcp/platform/services/workflow_engine.py b/src/itential_mcp/platform/services/workflow_engine.py index df03379..2f9e476 100644 --- a/src/itential_mcp/platform/services/workflow_engine.py +++ b/src/itential_mcp/platform/services/workflow_engine.py @@ -20,8 +20,7 @@ class Service(ServiceBase): name: str = "workflow_engine" async def _get_route(self, path: str, params: dict | None = None) -> list[dict]: - """ - Generic method to retrieve paginated data from workflow engine API endpoints. + """Generic method to retrieve paginated data from workflow engine API endpoints. This internal method handles pagination automatically by making multiple API calls with skip/limit parameters until all results are retrieved. @@ -30,43 +29,21 @@ async def _get_route(self, path: str, params: dict | None = None) -> list[dict]: Args: path: The API endpoint path to retrieve data from params: Optional query parameters to filter results. The method will - add pagination parameters (limit, skip) automatically. + add pagination parameters (limit, skip) automatically Returns: - A list of dictionaries containing all results from the paginated API endpoint. - Each dictionary represents a single metric element (job or task). + list[dict]: List of all results from the paginated API endpoint. + Each dictionary represents a single metric element (job or task) Raises: Exception: If there is an error retrieving data from the API endpoint """ - limit = 100 - skip = 0 - - if params is not None: - params["limit"] = limit - else: - params = {"limit": limit} - - results = list() - - while True: - params["skip"] = skip - - res = await self.client.get( - path, - params=params, - ) - - data = res.json() - - results.extend(data["results"]) - - if len(results) == data["total"]: - break - - skip += limit - - return results + return await self._paginate( + path, + params=params, + data_key="results", + total_key="total", + ) async def get_job_metrics(self, params: dict | None = None) -> list[dict]: """ diff --git a/tests/test_services_adapters.py b/tests/test_services_adapters.py index e6013d9..1e80e60 100644 --- a/tests/test_services_adapters.py +++ b/tests/test_services_adapters.py @@ -142,19 +142,19 @@ async def test_start_adapter_from_stopped_success(self, service): async def test_start_adapter_timeout(self, service): """Test adapter start timeout scenario""" with patch.object(service, "_get_adapter_health") as mock_health: - # Always return STOPPED (never transitions to RUNNING) - mock_health.side_effect = [ - {"results": [{"state": "STOPPED"}]}, # Initial state - {"results": [{"state": "STOPPED"}]}, # After start attempt - {"results": [{"state": "STOPPED"}]}, # Still stopped... - ] + # Initial state is STOPPED + mock_health.return_value = {"results": [{"state": "STOPPED"}]} - with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: - with pytest.raises(exceptions.TimeoutExceededError): + # Mock _poll_for_state to raise TimeoutError + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.side_effect = asyncio.TimeoutError() + + with pytest.raises(exceptions.TimeoutExceededError) as exc_info: await service.start_adapter("test-adapter", 2) - # Should have slept timeout number of times - assert mock_sleep.call_count == 2 + # Verify error message includes adapter name and timeout + assert "adapter 'test-adapter'" in str(exc_info.value) + assert "2s" in str(exc_info.value) @pytest.mark.asyncio async def test_start_adapter_dead_state(self, service): @@ -165,7 +165,8 @@ async def test_start_adapter_dead_state(self, service): with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.start_adapter("test-adapter", 10) - assert "adapter `test-adapter` is `DEAD`" in str(exc_info.value) + assert "adapter 'test-adapter'" in str(exc_info.value) + assert "DEAD" in str(exc_info.value) service.client.put.assert_not_called() @pytest.mark.asyncio @@ -177,7 +178,8 @@ async def test_start_adapter_deleted_state(self, service): with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.start_adapter("test-adapter", 10) - assert "adapter `test-adapter` is `DELETED`" in str(exc_info.value) + assert "adapter 'test-adapter'" in str(exc_info.value) + assert "DELETED" in str(exc_info.value) @pytest.mark.asyncio async def test_start_adapter_state_transition_during_wait(self, service): @@ -257,18 +259,19 @@ async def test_stop_adapter_from_running_success(self, service): async def test_stop_adapter_timeout(self, service): """Test adapter stop timeout scenario""" with patch.object(service, "_get_adapter_health") as mock_health: - # Always return RUNNING (never transitions to STOPPED) - mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, # Initial state - {"results": [{"state": "RUNNING"}]}, # After stop attempt - {"results": [{"state": "RUNNING"}]}, # Still running... - ] + # Initial state is RUNNING + mock_health.return_value = {"results": [{"state": "RUNNING"}]} - with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: - with pytest.raises(exceptions.TimeoutExceededError): + # Mock _poll_for_state to raise TimeoutError + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.side_effect = asyncio.TimeoutError() + + with pytest.raises(exceptions.TimeoutExceededError) as exc_info: await service.stop_adapter("test-adapter", 2) - assert mock_sleep.call_count == 2 + # Verify error message includes adapter name and timeout + assert "adapter 'test-adapter'" in str(exc_info.value) + assert "2s" in str(exc_info.value) @pytest.mark.asyncio async def test_stop_adapter_dead_state(self, service): @@ -279,7 +282,8 @@ async def test_stop_adapter_dead_state(self, service): with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.stop_adapter("test-adapter", 10) - assert "adapter `test-adapter` is `DEAD`" in str(exc_info.value) + assert "adapter 'test-adapter'" in str(exc_info.value) + assert "DEAD" in str(exc_info.value) service.client.put.assert_not_called() @pytest.mark.asyncio @@ -291,7 +295,8 @@ async def test_stop_adapter_deleted_state(self, service): with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.stop_adapter("test-adapter", 10) - assert "adapter `test-adapter` is `DELETED`" in str(exc_info.value) + assert "adapter 'test-adapter'" in str(exc_info.value) + assert "DELETED" in str(exc_info.value) class TestRestartAdapter: @@ -333,18 +338,19 @@ async def test_restart_adapter_from_running_success(self, service): async def test_restart_adapter_timeout(self, service): """Test adapter restart timeout scenario""" with patch.object(service, "_get_adapter_health") as mock_health: - # First call RUNNING, then adapter gets stuck in some other state - mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, # Initial state - {"results": [{"state": "STOPPED"}]}, # After restart - stuck - {"results": [{"state": "STOPPED"}]}, # Still stopped... - ] + # Initial state is RUNNING + mock_health.return_value = {"results": [{"state": "RUNNING"}]} - with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: - with pytest.raises(exceptions.TimeoutExceededError): + # Mock _poll_for_state to raise TimeoutError + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.side_effect = asyncio.TimeoutError() + + with pytest.raises(exceptions.TimeoutExceededError) as exc_info: await service.restart_adapter("test-adapter", 2) - assert mock_sleep.call_count == 2 + # Verify error message includes adapter name and timeout + assert "adapter 'test-adapter'" in str(exc_info.value) + assert "2s" in str(exc_info.value) @pytest.mark.asyncio async def test_restart_adapter_dead_state(self, service): @@ -355,7 +361,8 @@ async def test_restart_adapter_dead_state(self, service): with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.restart_adapter("test-adapter", 10) - assert "adapter `test-adapter` is `DEAD`" in str(exc_info.value) + assert "adapter 'test-adapter'" in str(exc_info.value) + assert "DEAD" in str(exc_info.value) service.client.put.assert_not_called() @pytest.mark.asyncio @@ -367,7 +374,8 @@ async def test_restart_adapter_deleted_state(self, service): with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.restart_adapter("test-adapter", 10) - assert "adapter `test-adapter` is `DELETED`" in str(exc_info.value) + assert "adapter 'test-adapter'" in str(exc_info.value) + assert "DELETED" in str(exc_info.value) @pytest.mark.asyncio async def test_restart_adapter_stopped_state(self, service): @@ -378,7 +386,8 @@ async def test_restart_adapter_stopped_state(self, service): with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.restart_adapter("test-adapter", 10) - assert "adapter `test-adapter` is `STOPPED`" in str(exc_info.value) + assert "adapter 'test-adapter'" in str(exc_info.value) + assert "STOPPED" in str(exc_info.value) service.client.put.assert_not_called() @pytest.mark.asyncio @@ -666,16 +675,61 @@ def mock_health_response(adapter_name): @pytest.mark.asyncio async def test_state_case_sensitivity(self, service): - """Test that adapter states are case sensitive""" + """Test that adapter states are case sensitive and invalid states raise ValueError""" with patch.object(service, "_get_adapter_health") as mock_health: - # Use lowercase state (should not be recognized as valid by the service logic) + # Use lowercase state (should not be recognized as valid enum value) mock_health.return_value = { "results": [{"id": "test-adapter", "state": "running"}] } - # The service should still return the result as-is since it just returns raw dict + # The service should raise ValueError when trying to convert invalid state to enum + with pytest.raises(ValueError) as exc_info: + await service.start_adapter("test-adapter", 10) + + # Verify the error is about invalid enum value + assert "running" in str(exc_info.value).lower() + + @pytest.mark.asyncio + async def test_start_adapter_in_starting_state(self, service): + """Test starting adapter that's already in STARTING state""" + with patch.object(service, "_get_adapter_health") as mock_health: + mock_health.return_value = { + "results": [{"id": "test-adapter", "state": "STARTING"}] + } + + # Should return immediately without calling PUT result = await service.start_adapter("test-adapter", 10) - assert result["state"] == "running" + + service.client.put.assert_not_called() + assert result["state"] == "STARTING" + + @pytest.mark.asyncio + async def test_stop_adapter_in_stopping_state(self, service): + """Test stopping adapter that's already in STOPPING state""" + with patch.object(service, "_get_adapter_health") as mock_health: + mock_health.return_value = { + "results": [{"id": "test-adapter", "state": "STOPPING"}] + } + + # Should return immediately without calling PUT + result = await service.stop_adapter("test-adapter", 10) + + service.client.put.assert_not_called() + assert result["state"] == "STOPPING" + + @pytest.mark.asyncio + async def test_restart_adapter_in_starting_state(self, service): + """Test restarting adapter that's in STARTING state (edge case)""" + with patch.object(service, "_get_adapter_health") as mock_health: + mock_health.return_value = { + "results": [{"id": "test-adapter", "state": "STARTING"}] + } + + # Should return immediately for intermediate states (not explicitly handled) + result = await service.restart_adapter("test-adapter", 10) + + service.client.put.assert_not_called() + assert result["state"] == "STARTING" class TestModelResponses: diff --git a/tests/test_services_applications.py b/tests/test_services_applications.py index c183d89..55e9fb8 100644 --- a/tests/test_services_applications.py +++ b/tests/test_services_applications.py @@ -2,6 +2,8 @@ # GNU General Public License v3.0+ (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later +import asyncio + import pytest from unittest.mock import AsyncMock, MagicMock, patch @@ -38,13 +40,9 @@ async def test_get_application_health_success(self, service, mock_client): "total": 1, "results": [ { - "results": [ - { - "id": "test-application", - "state": "RUNNING", - "version": "1.0.0", - } - ] + "id": "test-application", + "state": "RUNNING", + "version": "1.0.0", } ], } @@ -59,8 +57,8 @@ async def test_get_application_health_success(self, service, mock_client): ) # Verify result - should return data["results"][0] - assert result["results"][0]["id"] == "test-application" - assert result["results"][0]["state"] == "RUNNING" + assert result["id"] == "test-application" + assert result["state"] == "RUNNING" @pytest.mark.asyncio async def test_get_application_health_not_found(self, service, mock_client): @@ -114,26 +112,26 @@ def service(self, mock_client): async def test_start_application_already_running(self, service): """Test starting an application that's already running""" with patch.object(service, "_get_application_health") as mock_health: - mock_health.return_value = {"results": [{"state": "RUNNING"}]} + mock_health.return_value = {"state": "RUNNING", "id": "test-application"} result = await service.start_application("test-application", 10) # Should return immediately without calling PUT service.client.put.assert_not_called() # Should return the health data - assert result["results"][0]["state"] == "RUNNING" + assert result["state"] == "RUNNING" @pytest.mark.asyncio async def test_start_application_from_stopped_success(self, service): """Test successfully starting a stopped application""" with patch.object(service, "_get_application_health") as mock_health: - # First call returns STOPPED, second returns RUNNING - mock_health.side_effect = [ - {"results": [{"state": "STOPPED"}]}, - {"results": [{"state": "RUNNING"}]}, - ] + # First call returns STOPPED + mock_health.return_value = {"state": "STOPPED", "id": "test-application"} + + # Mock _poll_for_state to return RUNNING state + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.return_value = {"state": "RUNNING", "id": "test-application"} - with patch("asyncio.sleep", new_callable=AsyncMock): result = await service.start_application("test-application", 10) # Should call PUT to start application @@ -141,52 +139,52 @@ async def test_start_application_from_stopped_success(self, service): "/applications/test-application/start" ) - # Should check health twice (initial + after start) - assert mock_health.call_count == 2 - # Should return the final health data - assert result["results"][0]["state"] == "RUNNING" + assert result["state"] == "RUNNING" @pytest.mark.asyncio async def test_start_application_timeout(self, service): """Test application start timeout scenario""" with patch.object(service, "_get_application_health") as mock_health: - # Always return STOPPED (never transitions to RUNNING) - mock_health.side_effect = [ - {"results": [{"state": "STOPPED"}]}, # Initial state - {"results": [{"state": "STOPPED"}]}, # After start attempt - {"results": [{"state": "STOPPED"}]}, # Still stopped... - ] + # Initial state is STOPPED + mock_health.return_value = {"state": "STOPPED", "id": "test-application"} - with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: - with pytest.raises(exceptions.TimeoutExceededError): - await service.start_application("test-application", 2) + # Mock _poll_for_state to be an async function that never completes (simulates timeout) + async def never_completes(*args): + await asyncio.sleep(999999) # Sleep forever - # Should have slept timeout number of times - assert mock_sleep.call_count == 2 + with patch.object(service, "_poll_for_state", side_effect=never_completes): + with pytest.raises(exceptions.TimeoutExceededError) as exc_info: + await service.start_application("test-application", 0.1) + + # Verify error message includes application name and timeout + assert "application 'test-application'" in str(exc_info.value) + assert "0.1s" in str(exc_info.value) @pytest.mark.asyncio async def test_start_application_dead_state(self, service): """Test starting an application in DEAD state""" with patch.object(service, "_get_application_health") as mock_health: - mock_health.return_value = {"results": [{"state": "DEAD"}]} + mock_health.return_value = {"state": "DEAD", "id": "test-application"} with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.start_application("test-application", 10) - assert "application `test-application` is `DEAD`" in str(exc_info.value) + assert "application 'test-application'" in str(exc_info.value) + assert "DEAD" in str(exc_info.value) service.client.put.assert_not_called() @pytest.mark.asyncio async def test_start_application_deleted_state(self, service): """Test starting an application in DELETED state""" with patch.object(service, "_get_application_health") as mock_health: - mock_health.return_value = {"results": [{"state": "DELETED"}]} + mock_health.return_value = {"state": "DELETED", "id": "test-application"} with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.start_application("test-application", 10) - assert "application `test-application` is `DELETED`" in str(exc_info.value) + assert "application 'test-application'" in str(exc_info.value) + assert "DELETED" in str(exc_info.value) class TestStopApplication: @@ -208,26 +206,26 @@ def service(self, mock_client): async def test_stop_application_already_stopped(self, service): """Test stopping an application that's already stopped""" with patch.object(service, "_get_application_health") as mock_health: - mock_health.return_value = {"results": [{"state": "STOPPED"}]} + mock_health.return_value = {"state": "STOPPED", "id": "test-application"} result = await service.stop_application("test-application", 10) # Should return immediately without calling PUT service.client.put.assert_not_called() # Should return the health data - assert result["results"][0]["state"] == "STOPPED" + assert result["state"] == "STOPPED" @pytest.mark.asyncio async def test_stop_application_from_running_success(self, service): """Test successfully stopping a running application""" with patch.object(service, "_get_application_health") as mock_health: - # First call returns RUNNING, second returns STOPPED - mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, - {"results": [{"state": "STOPPED"}]}, - ] + # First call returns RUNNING + mock_health.return_value = {"state": "RUNNING", "id": "test-application"} + + # Mock _poll_for_state to return STOPPED state + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.return_value = {"state": "STOPPED", "id": "test-application"} - with patch("asyncio.sleep", new_callable=AsyncMock): result = await service.stop_application("test-application", 10) # Should call PUT to stop application @@ -236,63 +234,64 @@ async def test_stop_application_from_running_success(self, service): ) # Should return the final health data - assert result["results"][0]["state"] == "STOPPED" + assert result["state"] == "STOPPED" @pytest.mark.asyncio async def test_stop_application_timeout(self, service): """Test application stop timeout scenario""" with patch.object(service, "_get_application_health") as mock_health: - # Always return RUNNING (never transitions to STOPPED) - mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, # Initial state - {"results": [{"state": "RUNNING"}]}, # After stop attempt - {"results": [{"state": "RUNNING"}]}, # Still running... - ] + # Initial state is RUNNING + mock_health.return_value = {"state": "RUNNING", "id": "test-application"} - with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: - with pytest.raises(exceptions.TimeoutExceededError): - await service.stop_application("test-application", 2) + # Mock _poll_for_state to be an async function that never completes (simulates timeout) + async def never_completes(*args): + await asyncio.sleep(999999) # Sleep forever - # Should have slept timeout number of times - assert mock_sleep.call_count == 2 + with patch.object(service, "_poll_for_state", side_effect=never_completes): + with pytest.raises(exceptions.TimeoutExceededError) as exc_info: + await service.stop_application("test-application", 0.1) + + # Verify error message includes application name and timeout + assert "application 'test-application'" in str(exc_info.value) + assert "0.1s" in str(exc_info.value) @pytest.mark.asyncio async def test_stop_application_dead_state(self, service): """Test stopping an application in DEAD state""" with patch.object(service, "_get_application_health") as mock_health: - mock_health.return_value = {"results": [{"state": "DEAD"}]} + mock_health.return_value = {"state": "DEAD", "id": "test-application"} with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.stop_application("test-application", 10) - assert "application `test-application` is `DEAD`" in str(exc_info.value) + assert "application 'test-application'" in str(exc_info.value) + assert "DEAD" in str(exc_info.value) service.client.put.assert_not_called() @pytest.mark.asyncio async def test_stop_application_deleted_state(self, service): """Test stopping an application in DELETED state""" with patch.object(service, "_get_application_health") as mock_health: - mock_health.return_value = {"results": [{"state": "DELETED"}]} + mock_health.return_value = {"state": "DELETED", "id": "test-application"} with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.stop_application("test-application", 10) - assert "application `test-application` is `DELETED`" in str(exc_info.value) + assert "application 'test-application'" in str(exc_info.value) + assert "DELETED" in str(exc_info.value) service.client.put.assert_not_called() @pytest.mark.asyncio async def test_stop_application_multiple_pages(self, service): - """Test stopping application with pagination during wait loop""" + """Test stopping application with multiple poll iterations""" with patch.object(service, "_get_application_health") as mock_health: - # First call returns RUNNING, then STOPPED after several checks - mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, # Initial check - {"results": [{"state": "RUNNING"}]}, # First wait check - {"results": [{"state": "RUNNING"}]}, # Second wait check - {"results": [{"state": "STOPPED"}]}, # Finally stopped - ] + # First call returns RUNNING + mock_health.return_value = {"state": "RUNNING", "id": "test-application"} + + # Mock _poll_for_state to simulate multiple iterations before success + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.return_value = {"state": "STOPPED", "id": "test-application"} - with patch("asyncio.sleep", new_callable=AsyncMock): result = await service.stop_application("test-application", 10) # Should call PUT to stop application @@ -300,9 +299,8 @@ async def test_stop_application_multiple_pages(self, service): "/applications/test-application/stop" ) - # Should check health multiple times - assert mock_health.call_count == 4 - assert result["results"][0]["state"] == "STOPPED" + # Should return the final state + assert result["state"] == "STOPPED" class TestRestartApplication: @@ -324,13 +322,13 @@ def service(self, mock_client): async def test_restart_application_from_running_success(self, service): """Test successfully restarting a running application""" with patch.object(service, "_get_application_health") as mock_health: - # First call returns RUNNING, second returns RUNNING after restart - mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, - {"results": [{"state": "RUNNING"}]}, - ] + # First call returns RUNNING + mock_health.return_value = {"state": "RUNNING", "id": "test-application"} + + # Mock _poll_for_state to return RUNNING state after restart + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.return_value = {"state": "RUNNING", "id": "test-application"} - with patch("asyncio.sleep", new_callable=AsyncMock): result = await service.restart_application("test-application", 10) # Should call PUT to restart application @@ -339,75 +337,77 @@ async def test_restart_application_from_running_success(self, service): ) # Should return the final health data - assert result["results"][0]["state"] == "RUNNING" + assert result["state"] == "RUNNING" @pytest.mark.asyncio async def test_restart_application_stopped_state(self, service): """Test restarting an application in STOPPED state""" with patch.object(service, "_get_application_health") as mock_health: - mock_health.return_value = {"results": [{"state": "STOPPED"}]} + mock_health.return_value = {"state": "STOPPED", "id": "test-application"} with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.restart_application("test-application", 10) - assert "application `test-application` is `STOPPED`" in str(exc_info.value) + assert "application 'test-application'" in str(exc_info.value) + assert "STOPPED" in str(exc_info.value) service.client.put.assert_not_called() @pytest.mark.asyncio async def test_restart_application_dead_state(self, service): """Test restarting an application in DEAD state""" with patch.object(service, "_get_application_health") as mock_health: - mock_health.return_value = {"results": [{"state": "DEAD"}]} + mock_health.return_value = {"state": "DEAD", "id": "test-application"} with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.restart_application("test-application", 10) - assert "application `test-application` is `DEAD`" in str(exc_info.value) + assert "application 'test-application'" in str(exc_info.value) + assert "DEAD" in str(exc_info.value) service.client.put.assert_not_called() @pytest.mark.asyncio async def test_restart_application_deleted_state(self, service): """Test restarting an application in DELETED state""" with patch.object(service, "_get_application_health") as mock_health: - mock_health.return_value = {"results": [{"state": "DELETED"}]} + mock_health.return_value = {"state": "DELETED", "id": "test-application"} with pytest.raises(exceptions.InvalidStateError) as exc_info: await service.restart_application("test-application", 10) - assert "application `test-application` is `DELETED`" in str(exc_info.value) + assert "application 'test-application'" in str(exc_info.value) + assert "DELETED" in str(exc_info.value) service.client.put.assert_not_called() @pytest.mark.asyncio async def test_restart_application_timeout(self, service): """Test application restart timeout scenario""" with patch.object(service, "_get_application_health") as mock_health: - # Always return RUNNING at first, then stays in intermediate state - mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, # Initial state - {"results": [{"state": "RESTARTING"}]}, # After restart attempt - {"results": [{"state": "RESTARTING"}]}, # Still restarting... - ] + # Initial state is RUNNING + mock_health.return_value = {"state": "RUNNING", "id": "test-application"} - with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: - with pytest.raises(exceptions.TimeoutExceededError): - await service.restart_application("test-application", 2) + # Mock _poll_for_state to be an async function that never completes (simulates timeout) + async def never_completes(*args): + await asyncio.sleep(999999) # Sleep forever - # Should have slept timeout number of times - assert mock_sleep.call_count == 2 + with patch.object(service, "_poll_for_state", side_effect=never_completes): + with pytest.raises(exceptions.TimeoutExceededError) as exc_info: + await service.restart_application("test-application", 0.1) + + # Verify error message includes application name and timeout + assert "application 'test-application'" in str(exc_info.value) + assert "0.1s" in str(exc_info.value) @pytest.mark.asyncio async def test_restart_application_multiple_state_changes(self, service): """Test restarting application through multiple state changes""" with patch.object(service, "_get_application_health") as mock_health: - # Simulate application going through restart states - mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, # Initial check - {"results": [{"state": "RESTARTING"}]}, # First wait check - {"results": [{"state": "STARTING"}]}, # Second wait check - {"results": [{"state": "RUNNING"}]}, # Finally running - ] + # Initial state is RUNNING + mock_health.return_value = {"state": "RUNNING", "id": "test-application"} + + # Mock _poll_for_state to return RUNNING state after going through restart states + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.return_value = {"state": "RUNNING", "id": "test-application"} - with patch("asyncio.sleep", new_callable=AsyncMock): result = await service.restart_application("test-application", 10) # Should call PUT to restart application @@ -415,23 +415,20 @@ async def test_restart_application_multiple_state_changes(self, service): "/applications/test-application/restart" ) - # Should check health multiple times - assert mock_health.call_count == 4 - assert result["results"][0]["state"] == "RUNNING" + # Should return the final state + assert result["state"] == "RUNNING" @pytest.mark.asyncio async def test_restart_application_already_running_no_change_needed(self, service): """Test restarting application that immediately returns to RUNNING""" with patch.object(service, "_get_application_health") as mock_health: - # Application is RUNNING before and after restart - mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, # Initial check - { - "results": [{"state": "RUNNING"}] - }, # Immediately running after restart - ] + # Application is RUNNING before restart + mock_health.return_value = {"state": "RUNNING", "id": "test-application"} + + # Mock _poll_for_state to immediately return RUNNING state + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.return_value = {"state": "RUNNING", "id": "test-application"} - with patch("asyncio.sleep", new_callable=AsyncMock): result = await service.restart_application("test-application", 10) # Should still call PUT to restart @@ -440,7 +437,7 @@ async def test_restart_application_already_running_no_change_needed(self, servic ) # Should return running state - assert result["results"][0]["state"] == "RUNNING" + assert result["state"] == "RUNNING" class TestServiceIntegration: @@ -578,10 +575,14 @@ async def test_unicode_application_name(self, service, mock_client): async def test_zero_timeout_behavior(self, service, mock_client): """Test behavior with zero timeout""" with patch.object(service, "_get_application_health") as mock_health: - mock_health.return_value = {"results": [{"state": "STOPPED"}]} + mock_health.return_value = {"state": "STOPPED", "id": "test-application"} + + # Mock _poll_for_state to raise TimeoutError + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.side_effect = asyncio.TimeoutError() - with pytest.raises(exceptions.TimeoutExceededError): - await service.start_application("test-application", 0) + with pytest.raises(exceptions.TimeoutExceededError): + await service.start_application("test-application", 0) # Should call PUT but timeout immediately since timeout=0 service.client.put.assert_called_once_with( @@ -590,40 +591,39 @@ async def test_zero_timeout_behavior(self, service, mock_client): @pytest.mark.asyncio async def test_negative_timeout_behavior(self, service, mock_client): - """Test behavior with negative timeout (demonstrates infinite loop issue)""" + """Test behavior with negative timeout (immediately times out)""" with patch.object(service, "_get_application_health") as mock_health: - # Return STOPPED first, then RUNNING after first attempt - mock_health.side_effect = [ - {"results": [{"state": "STOPPED"}]}, - { - "results": [{"state": "RUNNING"}] - }, # Immediately running to break loop - ] + # Return STOPPED first + mock_health.return_value = {"state": "STOPPED", "id": "test-application"} + + # asyncio.wait_for with negative timeout will immediately timeout + with pytest.raises(exceptions.TimeoutExceededError) as exc_info: + await service.start_application("test-application", -1) - # Should complete successfully with negative timeout if app starts immediately - result = await service.start_application("test-application", -1) + # Verify error message + assert "application 'test-application'" in str(exc_info.value) + assert "-1s" in str(exc_info.value) - # Should call PUT and succeed if application transitions to RUNNING immediately + # Should have called PUT before the timeout service.client.put.assert_called_once_with( "/applications/test-application/start" ) - assert result["results"][0]["state"] == "RUNNING" @pytest.mark.asyncio async def test_large_timeout_value(self, service, mock_client): """Test behavior with very large timeout value""" with patch.object(service, "_get_application_health") as mock_health: - # Return STOPPED first, then RUNNING - mock_health.side_effect = [ - {"results": [{"state": "STOPPED"}]}, - {"results": [{"state": "RUNNING"}]}, - ] + # Return STOPPED first + mock_health.return_value = {"state": "STOPPED", "id": "test-application"} + + # Mock _poll_for_state to return RUNNING state + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.return_value = {"state": "RUNNING", "id": "test-application"} - with patch("asyncio.sleep", new_callable=AsyncMock): result = await service.start_application("test-application", 999999) # Should work normally with large timeout - assert result["results"][0]["state"] == "RUNNING" + assert result["state"] == "RUNNING" @pytest.mark.asyncio async def test_start_application_not_found_error_propagation(self, service): @@ -660,8 +660,8 @@ async def test_concurrent_application_operations(self, service): with patch.object(service, "_get_application_health") as mock_health: # Mock different responses for different calls mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, # For first operation - {"results": [{"state": "STOPPED"}]}, # For second operation + {"state": "RUNNING", "id": "app1"}, # For first operation + {"state": "STOPPED", "id": "app2"}, # For second operation ] # Run two operations concurrently (though they should be independent) @@ -672,8 +672,8 @@ async def test_concurrent_application_operations(self, service): result1, result2 = await asyncio.gather(task1, task2) # Verify results - assert result1["results"][0]["state"] == "RUNNING" - assert result2["results"][0]["state"] == "STOPPED" + assert result1["state"] == "RUNNING" + assert result2["state"] == "STOPPED" class TestApplicationStateTransitions: @@ -695,47 +695,100 @@ def service(self, mock_client): async def test_complex_restart_state_transition(self, service): """Test restart with complex state transitions""" with patch.object(service, "_get_application_health") as mock_health: - # Simulate realistic restart state progression - mock_health.side_effect = [ - {"results": [{"state": "RUNNING"}]}, # Initial state - {"results": [{"state": "STOPPING"}]}, # Stopping phase - {"results": [{"state": "STOPPED"}]}, # Stopped phase - {"results": [{"state": "STARTING"}]}, # Starting phase - {"results": [{"state": "RUNNING"}]}, # Finally running - ] + # Initial state is RUNNING + mock_health.return_value = {"state": "RUNNING", "id": "test-app"} + + # Mock _poll_for_state to simulate going through multiple states before RUNNING + with patch.object(service, "_poll_for_state") as mock_poll: + mock_poll.return_value = {"state": "RUNNING", "id": "test-app"} - with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: result = await service.restart_application("test-app", 10) - # Should go through multiple state checks - assert mock_health.call_count == 5 - assert ( - mock_sleep.call_count == 3 - ) # Sleep after non-RUNNING states (4 checks - 1 final = 3 sleeps) - assert result["results"][0]["state"] == "RUNNING" + # Should return final state + assert result["state"] == "RUNNING" @pytest.mark.asyncio async def test_start_from_error_state(self, service): - """Test starting application from an ERROR state""" + """Test starting application from an invalid ERROR state""" with patch.object(service, "_get_application_health") as mock_health: - # Application in ERROR state initially - will not trigger PUT call - mock_health.return_value = {"results": [{"state": "ERROR"}]} + # Application in ERROR state - not a valid enum value, should raise ValueError + mock_health.return_value = {"state": "ERROR", "id": "test-app"} - result = await service.start_application("test-app", 10) + # Should raise ValueError when trying to convert invalid state to enum + with pytest.raises(ValueError) as exc_info: + await service.start_application("test-app", 10) - # Should not attempt to start from ERROR state (only STOPPED triggers PUT) + # Verify the error is about invalid enum value + assert "ERROR" in str(exc_info.value) service.client.put.assert_not_called() - assert result["results"][0]["state"] == "ERROR" @pytest.mark.asyncio async def test_unknown_state_handling(self, service): """Test handling of unknown application states""" with patch.object(service, "_get_application_health") as mock_health: - # Unknown state initially - will not trigger PUT call - mock_health.return_value = {"results": [{"state": "UNKNOWN"}]} + # Unknown state - not a valid enum value, should raise ValueError + mock_health.return_value = {"state": "UNKNOWN", "id": "test-app"} + + # Should raise ValueError when trying to convert invalid state to enum + with pytest.raises(ValueError) as exc_info: + await service.start_application("test-app", 10) + + # Verify the error is about invalid enum value + assert "UNKNOWN" in str(exc_info.value) + service.client.put.assert_not_called() + + @pytest.mark.asyncio + async def test_start_application_in_starting_state(self, service): + """Test starting application that's already in STARTING state (edge case)""" + with patch.object(service, "_get_application_health") as mock_health: + mock_health.return_value = {"state": "STARTING", "id": "test-app"} + # Should return immediately for intermediate states result = await service.start_application("test-app", 10) - # Should not attempt to start from UNKNOWN state (only STOPPED triggers PUT) service.client.put.assert_not_called() - assert result["results"][0]["state"] == "UNKNOWN" + assert result["state"] == "STARTING" + + @pytest.mark.asyncio + async def test_stop_application_in_stopping_state(self, service): + """Test stopping application that's already in STOPPING state (edge case)""" + with patch.object(service, "_get_application_health") as mock_health: + mock_health.return_value = {"state": "STOPPING", "id": "test-app"} + + # Should return immediately for intermediate states + result = await service.stop_application("test-app", 10) + + service.client.put.assert_not_called() + assert result["state"] == "STOPPING" + + @pytest.mark.asyncio + async def test_restart_application_in_starting_state(self, service): + """Test restarting application that's in STARTING state (edge case)""" + with patch.object(service, "_get_application_health") as mock_health: + mock_health.return_value = {"state": "STARTING", "id": "test-app"} + + # Should return immediately for intermediate states + result = await service.restart_application("test-app", 10) + + service.client.put.assert_not_called() + assert result["state"] == "STARTING" + + @pytest.mark.asyncio + async def test_poll_for_state_method_directly(self, service): + """Test _poll_for_state internal method directly""" + with patch.object(service, "_get_application_health") as mock_health: + # First two calls return STARTING, third returns RUNNING + mock_health.side_effect = [ + {"state": "STARTING", "id": "test-app"}, + {"state": "STARTING", "id": "test-app"}, + {"state": "RUNNING", "id": "test-app"}, + ] + + with patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + # Poll until RUNNING state is reached + result = await service._poll_for_state("test-app", "RUNNING") + + # Should have polled 3 times and slept 2 times + assert mock_health.call_count == 3 + assert mock_sleep.call_count == 2 + assert result["state"] == "RUNNING" diff --git a/tests/test_services_automation_studio.py b/tests/test_services_automation_studio.py index a90692d..09b6b62 100644 --- a/tests/test_services_automation_studio.py +++ b/tests/test_services_automation_studio.py @@ -92,6 +92,35 @@ async def test_describe_workflow_multiple_found(self, service, mock_client): assert "workflow not found" in str(exc_info.value) + @pytest.mark.asyncio + async def test_describe_workflow_with_name(self, service, mock_client): + """Test describing workflow by name""" + # Mock response data + mock_response = MagicMock() + mock_response.json.return_value = { + "total": 1, + "items": [ + { + "_id": "workflow-456", + "name": "My Workflow", + "description": "Test workflow by name", + "type": "automation", + } + ], + } + mock_client.get.return_value = mock_response + + result = await service.describe_workflow_with_name("My Workflow") + + # Verify the client was called with the correct name parameter + mock_client.get.assert_called_once_with( + "/automation-studio/workflows", params={"equals[name]": "My Workflow"} + ) + + # Verify result structure + assert result["_id"] == "workflow-456" + assert result["name"] == "My Workflow" + @pytest.mark.asyncio async def test_get_templates_no_filter(self, service, mock_client): """Test getting all templates without filtering""" diff --git a/tests/test_services_configuration_manager.py b/tests/test_services_configuration_manager.py index 3bb7961..fef355e 100644 --- a/tests/test_services_configuration_manager.py +++ b/tests/test_services_configuration_manager.py @@ -356,6 +356,34 @@ async def test_create_golden_config_tree_server_error(self, service, mock_client # Just verify that the ServerException was raised correctly assert isinstance(exc_info.value, exceptions.ServerException) + @pytest.mark.asyncio + async def test_create_golden_config_tree_http_error_without_response( + self, service, mock_client + ): + """Test create_golden_config_tree with HTTPStatusError that has no response attribute.""" + # Mock the parser endpoint for device type validation + parser_mock_response = Mock() + parser_mock_response.json.return_value = { + "list": [{"name": "cisco_ios"}, {"name": "juniper"}] + } + mock_client.get.return_value = parser_mock_response + + # Create an HTTPStatusError without a response attribute (edge case) + http_error = Exception("Network error") + server_error = ipsdk.exceptions.HTTPStatusError(http_error) + # Ensure response attribute doesn't exist or is None + if hasattr(server_error, "response"): + object.__setattr__(server_error, "response", None) + mock_client.post.side_effect = server_error + + with pytest.raises(exceptions.ServerException) as exc_info: + await service.create_golden_config_tree( + name="test-tree", device_type="cisco_ios" + ) + + # Verify that the ServerException was raised correctly + assert isinstance(exc_info.value, exceptions.ServerException) + @pytest.mark.asyncio async def test_create_golden_config_tree_invalid_device_type( self, service, mock_client @@ -536,7 +564,7 @@ async def test_add_golden_config_node_tree_not_found(self, service): template="template", ) - assert "tree non-existent-tree could not be found" in str(exc_info.value) + assert "tree 'non-existent-tree' not found" in str(exc_info.value) @pytest.mark.asyncio async def test_add_golden_config_node_server_error(self, service, mock_client): @@ -567,6 +595,35 @@ async def test_add_golden_config_node_server_error(self, service, mock_client): # Just verify that the ServerException was raised correctly assert isinstance(exc_info.value, exceptions.ServerException) + @pytest.mark.asyncio + async def test_add_golden_config_node_http_error_without_response( + self, service, mock_client + ): + """Test add_golden_config_node with HTTPStatusError that has no response attribute.""" + trees_data = [{"id": "tree-1", "name": "test-tree", "deviceType": "cisco_ios"}] + + service.get_golden_config_trees = AsyncMock(return_value=trees_data) + + # Create an HTTPStatusError without a response attribute (edge case) + http_error = Exception("Network error") + server_error = ipsdk.exceptions.HTTPStatusError(http_error) + # Ensure response attribute doesn't exist or is None + if hasattr(server_error, "response"): + object.__setattr__(server_error, "response", None) + mock_client.post.side_effect = server_error + + with pytest.raises(exceptions.ServerException) as exc_info: + await service.add_golden_config_node( + tree_name="test-tree", + version="v1.0", + path="base", + name="interface-config", + template="template", + ) + + # Verify that the ServerException was raised correctly + assert isinstance(exc_info.value, exceptions.ServerException) + @pytest.mark.asyncio async def test_add_golden_config_node_without_template(self, service, mock_client): """Test adding a golden config node without template.""" @@ -928,7 +985,7 @@ async def test_create_device_group_duplicate_name_error(self, service, mock_clie service.get_device_groups = AsyncMock(return_value=existing_groups) with pytest.raises( - ValueError, match="device group Existing Group already exists" + ValueError, match="device group 'Existing Group' already exists" ): await service.create_device_group( name="Existing Group", diff --git a/tests/test_services_gateway_manager.py b/tests/test_services_gateway_manager.py index bd00f03..a91f634 100644 --- a/tests/test_services_gateway_manager.py +++ b/tests/test_services_gateway_manager.py @@ -268,7 +268,9 @@ async def test_run_service_success_with_params(self, service, mock_client): mock_response.json.return_value = expected_result mock_client.post.return_value = mock_response - result = await service.run_service(service_name, cluster_name, input_params=input_params) + result = await service.run_service( + service_name, cluster_name, input_params=input_params + ) # Verify client was called with correct parameters including input params expected_body = { @@ -963,7 +965,10 @@ async def test_update_certificate_all_fields(self, service, mock_client): mock_client.put.return_value = mock_response result = await service.update_certificate( - cert_id, alias=new_alias, contract_id=new_contract_id, raw_certificate=new_cert + cert_id, + alias=new_alias, + contract_id=new_contract_id, + raw_certificate=new_cert, ) expected_body = { diff --git a/tests/test_services_lifecycle_manager.py b/tests/test_services_lifecycle_manager.py index ad6ba9b..742046f 100644 --- a/tests/test_services_lifecycle_manager.py +++ b/tests/test_services_lifecycle_manager.py @@ -431,6 +431,75 @@ async def test_get_instances_multiple_pages(self, service): # Verify we got all the data assert len([x for x in result if x["name"].startswith("instance")]) == 120 + @pytest.mark.asyncio + async def test_get_instances_pagination_exception(self, service): + """Test get_instances handles pagination exceptions""" + # Mock describe_resource + resource_response = {"_id": "model-123", "name": "test-resource"} + service.describe_resource = AsyncMock(return_value=resource_response) + + # First page succeeds + first_response = MagicMock() + first_response.json.return_value = { + "data": [{"_id": "1", "name": "instance1"}], + "metadata": {"total": 250}, # More than one page + } + + # Mock _fetch_page to fail on one of the parallel requests + # For skip values, with total=250 and limit=100, we'll have: + # - skip=0 (first page, already retrieved) + # - skip=100 (second parallel fetch) + # - skip=200 (third parallel fetch) + call_count = [0] + + async def mock_fetch_page(endpoint, skip, limit): + call_count[0] += 1 + # Fail on first parallel call + if call_count[0] == 1: # First call to _fetch_page + raise Exception("Network error during fetch") + return [{"_id": f"{skip}", "name": f"instance{skip}"}] + + service._fetch_page = mock_fetch_page + service.client.get.return_value = first_response + + # Should propagate the exception from the failed page + with pytest.raises(Exception, match="Network error during fetch"): + await service.get_instances("test-resource") + + @pytest.mark.asyncio + async def test_get_instances_successful_parallel_pagination(self, service): + """Test get_instances with successful parallel pagination""" + # Mock describe_resource + resource_response = {"_id": "model-123", "name": "test-resource"} + service.describe_resource = AsyncMock(return_value=resource_response) + + # First page succeeds with 250 total items + first_response = MagicMock() + first_response.json.return_value = { + "data": [ + {"_id": f"instance-{i}", "name": f"instance-{i}"} for i in range(100) + ], + "metadata": {"total": 250}, + } + + # Mock _fetch_page to successfully return remaining pages + async def mock_fetch_page(endpoint, skip, limit): + return [ + {"_id": f"instance-{i}", "name": f"instance-{i}"} + for i in range(skip, min(skip + limit, 250)) + ] + + service._fetch_page = mock_fetch_page + service.client.get.return_value = first_response + + result = await service.get_instances("test-resource") + + # Should have all 250 instances + assert len(result) == 250 + # Verify first and last items + assert result[0]["_id"] == "instance-0" + assert result[249]["_id"] == "instance-249" + class TestDescribeInstance: """Test the describe_instance method""" diff --git a/tests/test_services_mop.py b/tests/test_services_mop.py index 32aa74a..6feb6f5 100644 --- a/tests/test_services_mop.py +++ b/tests/test_services_mop.py @@ -495,6 +495,62 @@ async def test_update_command_template_global(self, mop_service, mock_client): assert call_args[1]["json"]["mop"]["commands"] == commands assert call_args[1]["json"]["mop"]["description"] == "Updated template" + @pytest.mark.asyncio + async def test_update_command_template_with_project(self, mop_service, mock_client): + """Test updating a command template within a project scope.""" + commands = [ + { + "command": "show version", + "passRule": True, + "rules": [{"rule": "IOS", "eval": "contains", "severity": "info"}], + } + ] + + # Mock project ID lookup (for _get_project_id_from_name) + projects_response = { + "data": [{"_id": "project-123", "name": "Test Project"}], + "metadata": {"total": 1}, + } + mock_projects_response = MagicMock() + mock_projects_response.json.return_value = projects_response + + # Mock existing template lookup with project-scoped name (for describe_command_template) + existing_template = { + "_id": "test_template", + "name": "@project-123: test_template", + "created": 1757610875214, + "createdBy": "test@example.com", + } + mock_get_template_response = MagicMock() + mock_get_template_response.json.return_value = [existing_template] + + expected_response = { + "acknowledged": True, + "modifiedCount": 1, + } + mock_put_response = MagicMock() + mock_put_response.json.return_value = expected_response + + # Set up side effects for multiple GET calls: + # 1. Get project ID by name (in update_command_template) + # 2. Get project ID by name again (in describe_command_template) + # 3. Get existing template by project-scoped name + mock_client.get.side_effect = [ + mock_projects_response, + mock_projects_response, + mock_get_template_response, + ] + mock_client.put.return_value = mock_put_response + + result = await mop_service.update_command_template( + name="test_template", commands=commands, project="Test Project" + ) + + assert result == expected_response + # Verify PUT was called with project-scoped name + call_args = mock_client.put.call_args + assert "@project-123: test_template" in call_args[0][0] + @pytest.mark.asyncio async def test_update_command_template_not_found(self, mop_service, mock_client): """Test updating a non-existent command template."""