Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions ayon_api/server_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1329,7 +1329,7 @@ def delete(self, entrypoint: str, **kwargs):
def _endpoint_to_url(
self,
endpoint: str,
use_rest: Optional[bool] = True
use_rest: bool = True,
) -> str:
"""Cleanup endpoint and return full url to AYON server.

Expand All @@ -1347,25 +1347,51 @@ def _endpoint_to_url(
endpoint = endpoint.lstrip("/").rstrip("/")
if endpoint.startswith(self._base_url):
return endpoint
base_url = self._rest_url if use_rest else self._graphql_url
base_url = self._rest_url if use_rest else self._base_url
return f"{base_url}/{endpoint}"

def _download_file_to_stream(
self, url: str, stream, chunk_size, progress
self,
url: str,
stream: StreamType,
chunk_size: int,
progress: TransferProgress,
):
kwargs = {"stream": True}
headers = self.get_headers()
kwargs = {
"stream": True,
"headers": headers,
}
if self._session is None:
kwargs["headers"] = self.get_headers()
get_func = self._base_functions_mapping[RequestTypes.get]
else:
get_func = self._session_functions_mapping[RequestTypes.get]

with get_func(url, **kwargs) as response:
response.raise_for_status()
progress.set_content_size(response.headers["Content-length"])
for chunk in response.iter_content(chunk_size=chunk_size):
stream.write(chunk)
progress.add_transferred_chunk(len(chunk))
retries = self.get_default_max_retries()
for attempt in range(retries):
# Continue in download
offset = progress.get_transferred_size()
if offset > 0:
headers["Range"] = f"bytes={offset}-"

try:
with get_func(url, **kwargs) as response:
response.raise_for_status()
progress.set_content_size(
response.headers["Content-length"]
)
for chunk in response.iter_content(chunk_size=chunk_size):
stream.write(chunk)
progress.add_transferred_chunk(len(chunk))
break

except (
requests.exceptions.Timeout,
requests.exceptions.ConnectionError,
):
if attempt == retries:
raise
progress.next_attempt()

def download_file_to_stream(
self,
Expand Down Expand Up @@ -1399,7 +1425,7 @@ def download_file_to_stream(
if not chunk_size:
chunk_size = self.default_download_chunk_size

url = self._endpoint_to_url(endpoint)
url = self._endpoint_to_url(endpoint, use_rest=False)

if progress is None:
progress = TransferProgress()
Expand Down Expand Up @@ -1543,11 +1569,27 @@ def _upload_file(
if not chunk_size:
chunk_size = self.default_upload_chunk_size

response = post_func(
url,
data=self._upload_chunks_iter(stream, progress, chunk_size),
**kwargs
)
retries = self.get_default_max_retries()
response = None
for attempt in range(retries):
try:
response = post_func(
url,
data=self._upload_chunks_iter(
stream, progress, chunk_size
),
**kwargs
)
break

except (
requests.exceptions.Timeout,
requests.exceptions.ConnectionError,
):
if attempt == retries:
raise
progress.next_attempt()
progress.reset_transferred()

response.raise_for_status()
return response
Expand Down Expand Up @@ -1580,7 +1622,7 @@ def upload_file_from_stream(
requests.Response: Response object

"""
url = self._endpoint_to_url(endpoint)
url = self._endpoint_to_url(endpoint, use_rest=False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was changed back to original value in develop after merge commit


# Create dummy object so the function does not have to check
# 'progress' variable everywhere
Expand Down
16 changes: 16 additions & 0 deletions ayon_api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ class TransferProgress:
"""Object to store progress of download/upload from/to server."""

def __init__(self):
self._attempt: int = 0
self._started: bool = False
self._transfer_done: bool = False
self._transferred: int = 0
Expand Down Expand Up @@ -850,6 +851,17 @@ def set_started(self):
if self._started:
raise ValueError("Progress already started")
self._started = True
self._attempt = 1

def get_attempt(self) -> int:
"""Find out which attempt of progress it is."""
return self._attempt

def next_attempt(self) -> None:
"""Start new attempt of progress."""
if not self._started:
raise ValueError("Progress did not start yet")
self._attempt += 1

def get_transfer_done(self) -> bool:
"""Transfer finished.
Expand Down Expand Up @@ -921,6 +933,10 @@ def set_transferred_size(self, transferred: int):
"""
self._transferred = transferred

def reset_transferred(self) -> None:
"""Reset transferred size to initial value."""
self._transferred = 0

def add_transferred_chunk(self, chunk_size: int):
"""Add transferred chunk size in bytes.

Expand Down