diff --git a/src/tower/__init__.py b/src/tower/__init__.py index 129529ba..4dbeca74 100644 --- a/src/tower/__init__.py +++ b/src/tower/__init__.py @@ -13,6 +13,7 @@ from ._client import ( run_app, wait_for_run, + wait_for_runs, ) from ._features import override_get_attr, get_available_features, is_feature_enabled diff --git a/src/tower/_client.py b/src/tower/_client.py index d30c6ada..1e371a85 100644 --- a/src/tower/_client.py +++ b/src/tower/_client.py @@ -1,8 +1,18 @@ import os import time -from typing import Dict, Optional +import httpx +from typing import List, Dict, Optional from ._context import TowerContext +from .exceptions import ( + NotFoundException, + UnauthorizedException, + UnknownException, + UnhandledRunStateException, + RunFailedError, + TimeoutException, +) + from .tower_api_client import AuthenticatedClient from .tower_api_client.api.default import describe_run as describe_run_api from .tower_api_client.api.default import run_app as run_app_api @@ -27,23 +37,9 @@ # app somewhere. DEFAULT_TOWER_ENVIRONMENT = "default" - -def _env_client(ctx: TowerContext) -> AuthenticatedClient: - tower_url = ctx.tower_url - - if not tower_url.endswith("/v1"): - if tower_url.endswith("/"): - tower_url += "v1" - else: - tower_url += "/v1" - - return AuthenticatedClient( - verify_ssl=False, - base_url=tower_url, - token=ctx.api_key, - auth_header_name="X-API-Key", - prefix="", - ) +# DEFAULT_NUM_TIMEOUT_RETRIES is the number of times to retry querying the Tower +# API before we just give up entirely. +DEFAULT_NUM_TIMEOUT_RETRIES = 5 def run_app( @@ -52,9 +48,26 @@ def run_app( parameters: Optional[Dict[str, str]] = None, ) -> Run: """ - `run_app` invokes an app based on the configured environment. You can - supply an optional `environment` override, and an optional dict - `parameters` to pass into the app. + Run a Tower application with specified parameters and environment. + + This function initiates a new run of a Tower application identified by its slug. + The run can be configured with an optional environment override and runtime parameters. + If no environment is specified, the default environment from the Tower context is used. + + Args: + slug (str): The unique identifier of the application to run. + environment (Optional[str]): The environment to run the application in. + If not provided, uses the default environment from the Tower context. + parameters (Optional[Dict[str, str]]): A dictionary of key-value pairs + to pass as parameters to the application run. + + Returns: + Run: A Run object containing information about the initiated application run, + including the app_slug and run number. + + Raises: + RuntimeError: If there is an error initiating the run or if the Tower API + returns an error response. """ ctx = TowerContext.build() client = _env_client(ctx) @@ -84,17 +97,234 @@ def run_app( return output.run -def wait_for_run(run: Run) -> None: +def wait_for_run( + run: Run, + timeout: Optional[float] = 86_400.0, # one day + raise_on_failure: bool = False, +) -> Run: """ - `wait_for_run` waits for a run to reach a terminal state by polling the - Tower API every 2 seconds for the latest status. If the app returns a - terminal status (`exited`, `errored`, `cancelled`, or `crashed`) then this - function returns. + Wait for a Tower app run to reach a terminal state by polling the Tower API. + + This function continuously polls the Tower API every 2 seconds (defined by WAIT_TIMEOUT) + to check the status of the specified run. The function returns when the run reaches + a terminal state (exited, errored, cancelled, or crashed). + + Args: + run (Run): The Run object containing the app_slug and number of the run to monitor. + timeout (Optional[float]): Maximum time to wait in seconds before raising a + TimeoutException. Defaults to one day (86,400 seconds). + raise_on_failure (bool): If True, raises a RunFailedError when the run fails. + If False, returns the failed run object. Defaults to False. + + Returns: + Run: The final state of the run after completion or failure. + + Raises: + TimeoutException: If the specified timeout is reached before the run completes. + RunFailedError: If raise_on_failure is True and the run fails. + UnhandledRunStateException: If the run enters an unexpected state. + UnknownException: If there are persistent problems communicating with the Tower API. + NotFoundException: If the run cannot be found. + UnauthorizedException: If the API key is invalid or unauthorized. """ ctx = TowerContext.build() - client = _env_client(ctx) + retries = 0 + + # We use this to track the timeout, if one is defined. + start_time = time.time() while True: + # We check for a timeout at the top of the loop because we want to + # avoid waiting unnecessarily for the timeout hitting the Tower API if + # we've enounctered some sort of operational problem there. + if timeout is not None: + if _time_since(start_time) > timeout: + raise TimeoutException(_time_since(start_time)) + + # We time this out to avoid waiting forever on the API. + try: + desc = _check_run_status(ctx, run, timeout=2.0) + retries = 0 + + if _is_successful_run(desc): + return desc + elif _is_failed_run(desc): + if raise_on_failure: + raise RunFailedError(desc.app_slug, desc.number, desc.status) + else: + return desc + + elif _is_run_awaiting_completion(desc): + time.sleep(WAIT_TIMEOUT) + else: + raise UnhandledRunStateException(desc.status) + except TimeoutException: + # timed out in the API, we want to keep trying this for a while + # (assuming we didn't hit the global timeout limit) until we give + # up entirely. + retries += 1 + + if retries >= DEFAULT_NUM_TIMEOUT_RETRIES: + raise UnknownException("There was a problem with the Tower API.") + + +def wait_for_runs( + runs: List[Run], + timeout: Optional[float] = 86_400.0, # one day + raise_on_failure: bool = False, +) -> tuple[List[Run], List[Run]]: + """ + Wait for multiple Tower app runs to reach terminal states by polling the Tower API. + + This function continuously polls the Tower API every 2 seconds (defined by WAIT_TIMEOUT) + to check the status of all specified runs. The function returns when all runs reach + terminal states (`exited`, `errored`, `cancelled`, or `crashed`). + + Args: + runs (List[Run]): A list of Run objects to monitor. + timeout (Optional[float]): Maximum time to wait in seconds before raising a + TimeoutException. Defaults to one day (86,400 seconds). + raise_on_failure (bool): If True, raises a RunFailedError when any run fails. + If False, failed runs are returned in the failed_runs list. Defaults to False. + + Returns: + tuple[List[Run], List[Run]]: A tuple containing two lists: + - successful_runs: List of runs that completed successfully (status: 'exited') + - failed_runs: List of runs that failed (status: 'crashed', 'cancelled', or 'errored') + + Raises: + TimeoutException: If the specified timeout is reached before all runs complete. + RunFailedError: If raise_on_failure is True and any run fails. + UnhandledRunStateException: If a run enters an unexpected state. + UnknownException: If there are persistent problems communicating with the Tower API. + NotFoundException: If any run cannot be found. + UnauthorizedException: If the API key is invalid or unauthorized. + """ + ctx = TowerContext.build() + retries = 0 + + # We use this to track the timeout, if one is defined. + start_time = time.time() + + awaiting_runs = runs + successful_runs = [] + failed_runs = [] + + while len(awaiting_runs) > 0: + run = awaiting_runs.pop(0) + + # Check the overall timeout at the top of the loop in case we've + # spent a load of time deeper inside the loop on reties, etc. + if timeout is not None: + if _time_since(start_time) > timeout: + raise TimeoutException(_time_since(start_time)) + + try: + desc = _check_run_status(ctx, run, timeout=2.0) + retries = 0 + + if _is_successful_run(desc): + successful_runs.append(desc) + elif _is_failed_run(desc): + if raise_on_failure: + raise RunFailedError(desc.app_slug, desc.number, desc.status) + else: + failed_runs.append(desc) + + elif _is_run_awaiting_completion(desc): + time.sleep(WAIT_TIMEOUT) + + # We need to re-add this run to the list so we check it again + # in the future. We add it to the back since we took it off the + # front, effectively moving to the next run. + awaiting_runs.append(run) + else: + raise UnhandledRunStateException(desc.status) + except TimeoutException: + # timed out in the API, we want to keep trying this for a while + # (assuming we didn't hit the global timeout limit) until we give + # up entirely. + retries += 1 + + if retries >= DEFAULT_NUM_TIMEOUT_RETRIES: + raise UnknownException("There was a problem with the Tower API.") + else: + # Add the item back on the list for retry later on. + awaiting_runs.append(run) + + return (successful_runs, failed_runs) + + +def _is_failed_run(run: Run) -> bool: + """ + Check if the given run has failed. + + Args: + run (Run): The Run object containing the status to check. + + Returns: + bool: True if the run has failed, False otherwise. + """ + return run.status in ["crashed", "cancelled", "errored"] + + +def _is_successful_run(run: Run) -> bool: + """ + Check if a given run was successful. + + Args: + run (Run): The Run object containing the status to check. + + Returns: + bool: True if the run was successful, False otherwise. + """ + return run.status in ["exited"] + + +def _is_run_awaiting_completion(run: Run) -> bool: + """ + Check if a given run is either running or expected to run in the near future. + + Args: + run (Run): The Run object containing the status to check. + + Returns: + bool: True if the run is awaiting run or currently running, False otherwise. + """ + return run.status in ["pending", "scheduled", "running"] + + +def _env_client(ctx: TowerContext, timeout: Optional[float] = None) -> AuthenticatedClient: + tower_url = ctx.tower_url + + if not tower_url.endswith("/v1"): + if tower_url.endswith("/"): + tower_url += "v1" + else: + tower_url += "/v1" + + return AuthenticatedClient( + verify_ssl=False, + base_url=tower_url, + token=ctx.api_key, + auth_header_name="X-API-Key", + prefix="", + timeout=timeout, + ) + + +def _time_since(start_time: float) -> float: + return time.time() - start_time + + +def _check_run_status( + ctx: TowerContext, + run: Run, + timeout: Optional[float] = 2.0, # one day +) -> Run: + client = _env_client(ctx, timeout=timeout) + + try: output: Optional[Union[DescribeRunResponse, ErrorModel]] = describe_run_api.sync( slug=run.app_slug, seq=run.number, @@ -102,20 +332,23 @@ def wait_for_run(run: Run) -> None: ) if output is None: - raise RuntimeError("Error fetching run") - else: - if isinstance(output, ErrorModel): - raise RuntimeError(f"Error fetching run: {output.title}") + raise UnknownException("Failed to fetch run") + elif isinstance(output, ErrorModel): + # If it was a 404 error, that means that we couldn't find this + # app for some reason. This is really only relevant on the + # first time that we check--if we could find the run, but then + # suddenly couldn't that's a really big problem I'd say. + if output.status == 404: + raise NotFoundException(output.detail) + elif output.status == 401: + # NOTE: Most of the time, this shouldn't happen? + raise UnauthorizedException(output.detail) else: - desc = output.run - - if desc.status == "exited": - return - elif desc.status == "failed": - return - elif desc.status == "canceled": - return - elif desc.status == "errored": - return - else: - time.sleep(WAIT_TIMEOUT) + raise UnknownException(output.detail) + else: + # There was a run object, so let's return that. + return output.run + except httpx.TimeoutException: + # If we received a timeout from the API then we should raise our own + # timeout type. + raise TimeoutException(timeout) diff --git a/src/tower/exceptions.py b/src/tower/exceptions.py new file mode 100644 index 00000000..e2a7295f --- /dev/null +++ b/src/tower/exceptions.py @@ -0,0 +1,29 @@ +class NotFoundException(Exception): + def __init__(self, message: str): + super().__init__(message) + + +class UnauthorizedException(Exception): + def __init__(self, message: str): + super().__init__(message) + + +class UnknownException(Exception): + def __init__(self, message: str): + super().__init__(message) + + +class UnhandledRunStateException(Exception): + def __init__(self, state: str): + message = f"Run state '{state}' was unexpected. Maybe you need to upgrade to the latest Tower SDK." + super().__init__(message) + + +class TimeoutException(Exception): + def __init__(self, time: float): + super().__init__(f"A timeout occurred after {time} seconds.") + + +class RunFailedError(RuntimeError): + def __init__(self, app_name: str, number: int, state: str): + super().__init__(f"Run {app_name}#{number} failed with status '{state}'") diff --git a/tests/tower/test_client.py b/tests/tower/test_client.py index e9a75c70..734a17d0 100644 --- a/tests/tower/test_client.py +++ b/tests/tower/test_client.py @@ -1,121 +1,324 @@ - import os -import httpx import pytest +from datetime import datetime +from typing import List, Dict, Any, Optional -from tower.tower_api_client.models import ( - Run, -) +from tower.tower_api_client.models import Run +from tower.exceptions import RunFailedError -def test_running_apps(httpx_mock): - # Mock the response from the API - httpx_mock.add_response( - method="POST", - url="https://api.example.com/v1/apps/my-app/runs", - json={ + +@pytest.fixture +def mock_api_config(): + """Configure the Tower API client to use mock server.""" + os.environ["TOWER_URL"] = "https://api.example.com" + os.environ["TOWER_API_KEY"] = "abc123" + + # Only import after environment is configured + import tower + # Set WAIT_TIMEOUT to 0 to avoid actual waiting in tests + tower._client.WAIT_TIMEOUT = 0 + + return tower + + +@pytest.fixture +def mock_run_response_factory(): + """Factory to create consistent run response objects.""" + def _create_run_response( + app_slug: str = "my-app", + app_version: str = "v6", + number: int = 0, + run_id: str = "50ac9bc1-c783-4359-9917-a706f20dc02c", + status: str = "pending", + status_group: str = "", + parameters: Optional[List[Dict[str, Any]]] = None + ) -> Dict[str, Any]: + """Create a mock run response with the given parameters.""" + if parameters is None: + parameters = [] + + return { "run": { - "app_slug": "my-app", - "app_version": "v6", + "app_slug": app_slug, + "app_version": app_version, "cancelled_at": None, - "created_at": "2025-04-25T20:54:58.762547Z", - "ended_at": "2025-04-25T20:55:35.220295Z", - "environment": "default", - "number": 0, - "run_id": "50ac9bc1-c783-4359-9917-a706f20dc02c", + "created_at": "2025-04-25T20:54:58.762547Z", + "ended_at": "2025-04-25T20:55:35.220295Z", + "environment": "default", + "number": number, + "run_id": run_id, "scheduled_at": "2025-04-25T20:54:58.761867Z", - "started_at": "2025-04-25T20:54:59.366937Z", - "status": "pending", - "status_group": "", - "parameters": [] + "started_at": "2025-04-25T20:54:59.366937Z", + "status": status, + "status_group": status_group, + "parameters": parameters } - }, + } + + return _create_run_response + + +@pytest.fixture +def create_run_object(): + """Factory to create Run objects for testing.""" + def _create_run( + app_slug: str = "my-app", + app_version: str = "v6", + number: int = 0, + run_id: str = "50ac9bc1-c783-4359-9917-a706f20dc02c", + status: str = "running", + status_group: str = "failed", + parameters: Optional[List[Dict[str, Any]]] = None + ) -> Run: + """Create a Run object with the given parameters.""" + if parameters is None: + parameters = [] + + return Run( + app_slug=app_slug, + app_version=app_version, + cancelled_at=None, + created_at="2025-04-25T20:54:58.762547Z", + ended_at="2025-04-25T20:55:35.220295Z", + environment="default", + number=number, + run_id=run_id, + scheduled_at="2025-04-25T20:54:58.761867Z", + started_at="2025-04-25T20:54:59.366937Z", + status=status, + status_group=status_group, + parameters=parameters + ) + + return _create_run + + +def test_running_apps(httpx_mock, mock_api_config, mock_run_response_factory): + # Mock the response from the API + httpx_mock.add_response( + method="POST", + url="https://api.example.com/v1/apps/my-app/runs", + json=mock_run_response_factory(), status_code=200, ) - # We tell the client to use the mock server. - os.environ["TOWER_URL"] = "https://api.example.com" - os.environ["TOWER_API_KEY"] = "abc123" - # Call the function that makes the API request - import tower + tower = mock_api_config run: Run = tower.run_app("my-app", environment="production") # Assert the response assert run is not None + assert run.app_slug == "my-app" + assert run.status == "pending" -def test_waiting_for_runs(httpx_mock): - # Mock the response from the API + +def test_waiting_for_a_run(httpx_mock, mock_api_config, mock_run_response_factory, create_run_object): + run_number = 3 + + # First response: pending status httpx_mock.add_response( method="GET", - url="https://api.example.com/v1/apps/my-app/runs/3", - json={ - "run": { - "app_slug": "my-app", - "app_version": "v6", - "cancelled_at": None, - "created_at": "2025-04-25T20:54:58.762547Z", - "ended_at": "2025-04-25T20:55:35.220295Z", - "environment": "default", - "number": 3, - "run_id": "50ac9bc1-c783-4359-9917-a706f20dc02c", - "scheduled_at": "2025-04-25T20:54:58.761867Z", - "started_at": "2025-04-25T20:54:59.366937Z", - "status": "pending", - "status_group": "", - "parameters": [] - } - }, + url=f"https://api.example.com/v1/apps/my-app/runs/{run_number}", + json=mock_run_response_factory(number=run_number, status="pending"), status_code=200, ) - # Second request, will indicate that it's done. + # Second response: completed status httpx_mock.add_response( method="GET", - url="https://api.example.com/v1/apps/my-app/runs/3", - json={ - "run": { - "app_slug": "my-app", - "app_version": "v6", - "cancelled_at": None, - "created_at": "2025-04-25T20:54:58.762547Z", - "ended_at": "2025-04-25T20:55:35.220295Z", - "environment": "default", - "number": 3, - "run_id": "50ac9bc1-c783-4359-9917-a706f20dc02c", - "scheduled_at": "2025-04-25T20:54:58.761867Z", - "started_at": "2025-04-25T20:54:59.366937Z", - "status": "exited", - "status_group": "successful", - "parameters": [] - } - }, + url=f"https://api.example.com/v1/apps/my-app/runs/{run_number}", + json=mock_run_response_factory(number=run_number, status="exited", status_group="successful"), status_code=200, ) - # We tell the client to use the mock server. - os.environ["TOWER_URL"] = "https://api.example.com" - os.environ["TOWER_API_KEY"] = "abc123" + tower = mock_api_config + run = create_run_object(number=run_number, status="crashed") - import tower + # Now actually wait for the run + final_run = tower.wait_for_run(run) + + # Verify the final state + assert final_run.status == "exited" + assert final_run.status_group == "successful" - run = Run( - app_slug="my-app", - app_version="v6", - cancelled_at=None, - created_at="2025-04-25T20:54:58.762547Z", - ended_at="2025-04-25T20:55:35.220295Z", - environment="default", - number=3, - run_id="50ac9bc1-c783-4359-9917-a706f20dc02c", - scheduled_at="2025-04-25T20:54:58.761867Z", - started_at="2025-04-25T20:54:59.366937Z", - status="crashed", - status_group="failed", - parameters=[] + +@pytest.mark.parametrize("run_numbers", [(3, 4)]) +def test_waiting_for_multiple_runs( + httpx_mock, + mock_api_config, + mock_run_response_factory, + create_run_object, + run_numbers +): + tower = mock_api_config + runs = [] + + # Setup mocks for each run + for run_number in run_numbers: + # First response: pending status + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/{run_number}", + json=mock_run_response_factory(number=run_number, status="pending"), + status_code=200, + ) + + # Second response: completed status + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/{run_number}", + json=mock_run_response_factory(number=run_number, status="exited", status_group="successful"), + status_code=200, + ) + + # Create the Run object + runs.append(create_run_object(number=run_number)) + + # Now actually wait for the runs + successful_runs, failed_runs = tower.wait_for_runs(runs) + + assert len(failed_runs) == 0 + + # Verify all runs completed successfully + for run in successful_runs: + assert run.status == "exited" + assert run.status_group == "successful" + + +def test_failed_runs_in_the_list( + httpx_mock, + mock_api_config, + mock_run_response_factory, + create_run_object +): + tower = mock_api_config + runs = [] + + # For the first run, we're going to simulate a success. + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/1", + json=mock_run_response_factory(number=1, status="pending"), + status_code=200, ) - # Set WAIT_TIMEOUT to 0 so we don't have to...wait. - tower._client.WAIT_TIMEOUT = 0 + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/1", + json=mock_run_response_factory(number=1, status="exited", status_group="successful"), + status_code=200, + ) + + runs.append(create_run_object(number=1)) + + # Second run will have been a failure. + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/2", + json=mock_run_response_factory(number=2, status="pending"), + status_code=200, + ) + + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/2", + json=mock_run_response_factory(number=2, status="crashed", status_group="failed"), + status_code=200, + ) + + runs.append(create_run_object(number=2)) + + # Third run was a success. + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/3", + json=mock_run_response_factory(number=3, status="pending"), + status_code=200, + ) + + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/3", + json=mock_run_response_factory(number=3, status="exited", status_group="successful"), + status_code=200, + ) + + runs.append(create_run_object(number=3)) + + + # Now actually wait for the runs + successful_runs, failed_runs = tower.wait_for_runs(runs) + + assert len(failed_runs) == 1 + + # Verify all successful runs + for run in successful_runs: + assert run.status == "exited" + assert run.status_group == "successful" + + # Verify all failed + for run in failed_runs: + assert run.status == "crashed" + assert run.status_group == "failed" + + +def test_raising_an_error_during_partial_failure( + httpx_mock, + mock_api_config, + mock_run_response_factory, + create_run_object +): + tower = mock_api_config + runs = [] + + # For the first run, we're going to simulate a success. + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/1", + json=mock_run_response_factory(number=1, status="pending"), + status_code=200, + ) + + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/1", + json=mock_run_response_factory(number=1, status="exited", status_group="successful"), + status_code=200, + ) + + runs.append(create_run_object(number=1)) + + # Second run will have been a failure. + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/2", + json=mock_run_response_factory(number=2, status="pending"), + status_code=200, + ) + + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/2", + json=mock_run_response_factory(number=2, status="crashed", status_group="failed"), + status_code=200, + ) + + runs.append(create_run_object(number=2)) + + # Third run was a success. + httpx_mock.add_response( + method="GET", + url=f"https://api.example.com/v1/apps/my-app/runs/3", + json=mock_run_response_factory(number=3, status="pending"), + status_code=200, + ) + + # NOTE: We don't have a second response for this run because we'll never + # get to it. + + runs.append(create_run_object(number=3)) + - # Now actually wait for the run. - tower.wait_for_run(run) + # Now actually wait for the runs + with pytest.raises(RunFailedError) as excinfo: + tower.wait_for_runs(runs, raise_on_failure=True)