diff --git a/streamrip/client/deezer.py b/streamrip/client/deezer.py index 056463f9..94f2a999 100644 --- a/streamrip/client/deezer.py +++ b/streamrip/client/deezer.py @@ -3,7 +3,10 @@ import hashlib import logging +import re + import deezer +from deezer.errors import DataException from Cryptodome.Cipher import AES from ..config import Config @@ -35,12 +38,25 @@ class DeezerClient(Client): max_quality = 2 def __init__(self, config: Config): + """ + Initializes the DeezerClient. + + Args: + config (Config): The configuration object. + """ self.global_config = config self.client = deezer.Deezer() self.logged_in = False self.config = config.session.deezer async def login(self): + """ + Logs in to Deezer using the ARL (Authentication Request Login) token. + + Raises: + MissingCredentialsError: If the ARL is missing from the config. + AuthenticationError: If login fails. + """ # Used for track downloads self.session = await self.get_session( verify_ssl=self.global_config.session.downloads.verify_ssl @@ -54,6 +70,19 @@ async def login(self): self.logged_in = True async def get_metadata(self, item_id: str, media_type: str) -> dict: + """ + Fetches metadata for a given item. + + Args: + item_id (str): The ID of the item. + media_type (str): The type of media (track, album, playlist, artist). + + Returns: + dict: The metadata of the item. + + Raises: + Exception: If the media type is invalid. + """ # TODO: open asyncio PR to deezer py and integrate if media_type == "track": return await self.get_track(item_id) @@ -67,6 +96,18 @@ async def get_metadata(self, item_id: str, media_type: str) -> dict: raise Exception(f"Media type {media_type} not available on deezer") async def get_track(self, item_id: str) -> dict: + """ + Fetches metadata for a track. + + Args: + item_id (str): The track ID. + + Returns: + dict: The track metadata. + + Raises: + NonStreamableError: If the track is not streamable. + """ try: item = await asyncio.to_thread(self.client.api.get_track, item_id) except Exception as e: @@ -74,47 +115,133 @@ async def get_track(self, item_id: str) -> dict: album_id = item["album"]["id"] try: - album_metadata, album_tracks = await asyncio.gather( - asyncio.to_thread(self.client.api.get_album, album_id), - asyncio.to_thread(self.client.api.get_album_tracks, album_id), - ) + # reuse get_album to handle redirects + album_metadata = await self.get_album(str(album_id)) except Exception as e: logger.error(f"Error fetching album of track {item_id}: {e}") return item - album_metadata["tracks"] = album_tracks["data"] - album_metadata["track_total"] = len(album_tracks["data"]) item["album"] = album_metadata return item async def get_album(self, item_id: str) -> dict: - album_metadata, album_tracks = await asyncio.gather( - asyncio.to_thread(self.client.api.get_album, item_id), - asyncio.to_thread(self.client.api.get_album_tracks, item_id), - ) + """ + Fetches metadata for an album. + + Args: + item_id (str): The album ID. + + Returns: + dict: The album metadata. + """ + try: + album_metadata, album_tracks = await asyncio.gather( + asyncio.to_thread(self.client.api.get_album, item_id), + asyncio.to_thread(self.client.api.get_album_tracks, item_id), + ) + except DataException: + new_id = await self._resolve_redirect("album", item_id) + if new_id: + return await self.get_album(new_id) + raise + album_metadata["tracks"] = album_tracks["data"] album_metadata["track_total"] = len(album_tracks["data"]) return album_metadata + async def _resolve_redirect(self, media_type: str, item_id: str) -> str | None: + """ + Resolves potential Deezer redirects to find the actual item ID. + + Returns: + The new ID if a redirect occurred, otherwise None. + """ + url = f"https://www.deezer.com/{media_type}/{item_id}" + + try: + # Perform a HEAD request to follow redirects without downloading content + async with self.session.head(url, allow_redirects=True) as response: + final_url = str(response.url) + except Exception as e: + logger.warning(f"Failed to resolve redirect for {item_id}: {e}") + return None + + # If the URL hasn't changed, return early + if final_url == url: + return None + + # Attempt to extract the new ID from the final URL + match = re.search(rf"/{media_type}/(\d+)", final_url) + + # If a valid new ID is found and it differs from the original + if match and (new_id := match.group(1)) != item_id: + logger.info(f"Resolved redirect for {media_type} {item_id} -> {new_id}") + return new_id + + return None + async def get_playlist(self, item_id: str) -> dict: - pl_metadata, pl_tracks = await asyncio.gather( - asyncio.to_thread(self.client.api.get_playlist, item_id), - asyncio.to_thread(self.client.api.get_playlist_tracks, item_id), - ) + """ + Fetches metadata for a playlist. + + Args: + item_id (str): The playlist ID. + + Returns: + dict: The playlist metadata. + """ + try: + pl_metadata, pl_tracks = await asyncio.gather( + asyncio.to_thread(self.client.api.get_playlist, item_id), + asyncio.to_thread(self.client.api.get_playlist_tracks, item_id), + ) + except DataException: + new_id = await self._resolve_redirect("playlist", item_id) + if new_id: + return await self.get_playlist(new_id) + raise + pl_metadata["tracks"] = pl_tracks["data"] pl_metadata["track_total"] = len(pl_tracks["data"]) return pl_metadata async def get_artist(self, item_id: str) -> dict: - artist, albums = await asyncio.gather( - asyncio.to_thread(self.client.api.get_artist, item_id), - asyncio.to_thread(self.client.api.get_artist_albums, item_id), - ) + """ + Fetches metadata for an artist. + + Args: + item_id (str): The artist ID. + + Returns: + dict: The artist metadata. + """ + try: + artist, albums = await asyncio.gather( + asyncio.to_thread(self.client.api.get_artist, item_id), + asyncio.to_thread(self.client.api.get_artist_albums, item_id), + ) + except DataException: + new_id = await self._resolve_redirect("artist", item_id) + if new_id: + return await self.get_artist(new_id) + raise + artist["albums"] = albums["data"] return artist async def search(self, media_type: str, query: str, limit: int = 200) -> list[dict]: + """ + Searches for items on Deezer. + + Args: + media_type (str): The type of media to search for. + query (str): The search query. + limit (int): The maximum number of results to return. + + Returns: + list[dict]: A list of search results. + """ # TODO: use limit parameter if media_type == "featured": try: @@ -141,73 +268,84 @@ async def get_downloadable( quality: int = 2, is_retry: bool = False, ) -> DeezerDownloadable: + """ + Prepares a downloadable object for a track. + + Args: + item_id (str): The track ID. + quality (int): The desired quality (0, 1, or 2). + is_retry (bool): whether this is a retry attempt (internal use). + + Returns: + DeezerDownloadable: The downloadable object. + + Raises: + NonStreamableError: If the track cannot be streamed. + """ if item_id is None: raise NonStreamableError( "No item id provided. This can happen when searching for fallback songs.", ) - # TODO: optimize such that all of the ids are requested at once - dl_info: dict = {"quality": quality, "id": item_id} - track_info = self.client.gw.get_track(item_id) + # Ensure quality is within bounds [0, 2] + quality = max(0, min(quality, 2)) + # TODO: optimize such that all of the ids are requested at once + track_info = self.client.gw.get_track(item_id) fallback_id = track_info.get("FALLBACK", {}).get("SNG_ID") + # Mapping internal quality levels to Deezer API formats + # We list them in descending order to facilitate the fallback loop quality_map = [ (9, "MP3_128"), # quality 0 (3, "MP3_320"), # quality 1 - (1, "FLAC"), # quality 2 - ] - size_map = [ - int(track_info.get(f"FILESIZE_{format}", 0)) for _, format in quality_map + (1, "FLAC"), # quality 2 ] - dl_info["quality_to_size"] = size_map - - # Check if requested quality is available - if size_map[quality] == 0: - if self.config.lower_quality_if_not_available: - # Fallback to lower quality - while size_map[quality] == 0 and quality > 0: - logger.warning( - "The requested quality %s is not available. Falling back to quality %s", - quality, - quality - 1, - ) - quality -= 1 - else: - # No fallback - raise error - raise NonStreamableError( - f"The requested quality {quality} is not available and fallback is disabled." - ) - - # Update the quality in dl_info to reflect the final quality used - dl_info["quality"] = quality - _, format_str = quality_map[quality] + # Pre-calculate file sizes for metadata + dl_info: dict = {"quality": quality, "id": item_id} + dl_info["quality_to_size"] = [ + int(track_info.get(f"FILESIZE_{fmt}", 0)) for _, fmt in quality_map + ] token = track_info["TRACK_TOKEN"] - try: - logger.debug("Fetching deezer url with token %s", token) - url = self.client.get_track_url(token, format_str) - except deezer.WrongLicense: - raise NonStreamableError( - "The requested quality is not available with your subscription. " - "Deezer HiFi is required for quality 2. Otherwise, the maximum " - "quality allowed is 1.", - ) - except deezer.WrongGeolocation: - if not is_retry and fallback_id: - return await self.get_downloadable(fallback_id, quality, is_retry=True) - raise NonStreamableError( - "The requested track is not available. This may be due to your country/location.", - ) + url = None + final_quality = quality + + # --- START OF FALLBACK LOOP --- + # We try from the requested quality down to 0 (MP3_128) + for q_level in range(quality, -1, -1): + _, format_str = quality_map[q_level] + + try: + logger.debug(f"Attempting to fetch URL for quality {q_level} ({format_str})") + url = self.client.get_track_url(token, format_str) + if url: + final_quality = q_level + break # Success! + except deezer.WrongLicense: + logger.warning(f"Quality {q_level} not available for this account. Trying lower...") + continue + except deezer.WrongGeolocation: + if not is_retry and fallback_id: + logger.info(f"Geoblocked. Trying fallback ID: {fallback_id}") + return await self.get_downloadable(fallback_id, quality, is_retry=True) + raise NonStreamableError("Track geoblocked and no fallback available.") + # --- END OF FALLBACK LOOP --- + # If no URL was found through the official API, try the legacy encrypted method if url is None: + logger.debug("Official API failed, trying encrypted file URL.") url = self._get_encrypted_file_url( item_id, track_info["MD5_ORIGIN"], track_info["MEDIA_VERSION"], ) + if not url: + raise NonStreamableError(f"Could not retrieve any download URL for track {item_id}") + + dl_info["quality"] = final_quality dl_info["url"] = url logger.debug("dz track info: %s", track_info) return DeezerDownloadable(self.session, dl_info) @@ -218,6 +356,17 @@ def _get_encrypted_file_url( track_hash: str, media_version: str, ): + """ + Generates an encrypted file URL for a track when the standard API fails. + + Args: + meta_id (str): The metadata ID. + track_hash (str): The track hash. + media_version (str): The media version. + + Returns: + str: The encrypted file URL. + """ logger.debug("Unable to fetch URL. Trying encryption method.") format_number = 1