diff --git a/ayon_api/server_api.py b/ayon_api/server_api.py index 292cdfdfe..e2d8524e3 100644 --- a/ayon_api/server_api.py +++ b/ayon_api/server_api.py @@ -1329,7 +1329,7 @@ def delete(self, entrypoint: str, **kwargs): def _endpoint_to_url( self, endpoint: str, - use_rest: Optional[bool] = True + use_rest: bool = True, ) -> str: """Cleanup endpoint and return full url to AYON server. @@ -1347,25 +1347,51 @@ def _endpoint_to_url( endpoint = endpoint.lstrip("/").rstrip("/") if endpoint.startswith(self._base_url): return endpoint - base_url = self._rest_url if use_rest else self._graphql_url + base_url = self._rest_url if use_rest else self._base_url return f"{base_url}/{endpoint}" def _download_file_to_stream( - self, url: str, stream, chunk_size, progress + self, + url: str, + stream: StreamType, + chunk_size: int, + progress: TransferProgress, ): - kwargs = {"stream": True} + headers = self.get_headers() + kwargs = { + "stream": True, + "headers": headers, + } if self._session is None: - kwargs["headers"] = self.get_headers() get_func = self._base_functions_mapping[RequestTypes.get] else: get_func = self._session_functions_mapping[RequestTypes.get] - with get_func(url, **kwargs) as response: - response.raise_for_status() - progress.set_content_size(response.headers["Content-length"]) - for chunk in response.iter_content(chunk_size=chunk_size): - stream.write(chunk) - progress.add_transferred_chunk(len(chunk)) + retries = self.get_default_max_retries() + for attempt in range(retries): + # Continue in download + offset = progress.get_transferred_size() + if offset > 0: + headers["Range"] = f"bytes={offset}-" + + try: + with get_func(url, **kwargs) as response: + response.raise_for_status() + progress.set_content_size( + response.headers["Content-length"] + ) + for chunk in response.iter_content(chunk_size=chunk_size): + stream.write(chunk) + progress.add_transferred_chunk(len(chunk)) + break + + except ( + requests.exceptions.Timeout, + requests.exceptions.ConnectionError, + ): + if attempt == retries: + raise + progress.next_attempt() def download_file_to_stream( self, @@ -1399,7 +1425,7 @@ def download_file_to_stream( if not chunk_size: chunk_size = self.default_download_chunk_size - url = self._endpoint_to_url(endpoint) + url = self._endpoint_to_url(endpoint, use_rest=False) if progress is None: progress = TransferProgress() @@ -1543,11 +1569,27 @@ def _upload_file( if not chunk_size: chunk_size = self.default_upload_chunk_size - response = post_func( - url, - data=self._upload_chunks_iter(stream, progress, chunk_size), - **kwargs - ) + retries = self.get_default_max_retries() + response = None + for attempt in range(retries): + try: + response = post_func( + url, + data=self._upload_chunks_iter( + stream, progress, chunk_size + ), + **kwargs + ) + break + + except ( + requests.exceptions.Timeout, + requests.exceptions.ConnectionError, + ): + if attempt == retries: + raise + progress.next_attempt() + progress.reset_transferred() response.raise_for_status() return response @@ -1580,7 +1622,7 @@ def upload_file_from_stream( requests.Response: Response object """ - url = self._endpoint_to_url(endpoint) + url = self._endpoint_to_url(endpoint, use_rest=False) # Create dummy object so the function does not have to check # 'progress' variable everywhere diff --git a/ayon_api/utils.py b/ayon_api/utils.py index baf271277..49917c7bb 100644 --- a/ayon_api/utils.py +++ b/ayon_api/utils.py @@ -796,6 +796,7 @@ class TransferProgress: """Object to store progress of download/upload from/to server.""" def __init__(self): + self._attempt: int = 0 self._started: bool = False self._transfer_done: bool = False self._transferred: int = 0 @@ -850,6 +851,17 @@ def set_started(self): if self._started: raise ValueError("Progress already started") self._started = True + self._attempt = 1 + + def get_attempt(self) -> int: + """Find out which attempt of progress it is.""" + return self._attempt + + def next_attempt(self) -> None: + """Start new attempt of progress.""" + if not self._started: + raise ValueError("Progress did not start yet") + self._attempt += 1 def get_transfer_done(self) -> bool: """Transfer finished. @@ -921,6 +933,10 @@ def set_transferred_size(self, transferred: int): """ self._transferred = transferred + def reset_transferred(self) -> None: + """Reset transferred size to initial value.""" + self._transferred = 0 + def add_transferred_chunk(self, chunk_size: int): """Add transferred chunk size in bytes.