Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions cdp/cdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ class Cdp:
base_path (str): The base URL for the Platform API.
max_network_retries (int): The maximum number of network retries.
api_clients (Optional[ApiClients]): The Platform API clients instance.

"""

_instance = None
Expand All @@ -35,13 +34,8 @@ class Cdp:
def __new__(cls):
"""Create or return the singleton instance of the Cdp class.

This method overrides the default `__new__` behavior to implement the Singleton pattern.
It ensures that only one instance of the Cdp class exists throughout the application's lifecycle.
If an instance already exists, it returns the existing instance; otherwise, it creates a new one.

Returns:
Cdp: The singleton instance of the Cdp class.

"""
if cls._instance is None:
cls._instance = super().__new__(cls)
Expand Down Expand Up @@ -70,7 +64,6 @@ def configure(
max_network_retries (int): The maximum number of network retries. Defaults to 3.
source (Optional[str]): Specifies whether the sdk is being used directly or if it's an Agentkit extension.
source_version (Optional[str]): The version of the source package.

"""
cls.api_key_name = api_key_name
cls.private_key = private_key
Expand Down Expand Up @@ -103,6 +96,8 @@ def configure_from_json(
) -> None:
"""Configure the CDP SDK from a JSON file.

This updated method now accepts either "name" or "id" as the API key identifier.

Args:
file_path (str): The path to the JSON file. Defaults to "~/Downloads/cdp_api_key.json".
use_server_signer (bool): Whether to use the server signer. Defaults to False.
Expand All @@ -113,17 +108,19 @@ def configure_from_json(
source_version (Optional[str]): The version of the source package.

Raises:
InvalidConfigurationError: If the JSON file is missing the 'api_key_name' or 'private_key'.

InvalidConfigurationError: If the JSON file is missing the API key identifier or the private key.
"""
with open(os.path.expanduser(file_path)) as file:
data = json.load(file)
api_key_name = data.get("name")
# Accept either "name" or "id" for the API key identifier.
api_key_name = data.get("name") or data.get("id")
private_key = data.get("privateKey")
if not api_key_name:
raise InvalidConfigurationError("Invalid JSON format: Missing 'api_key_name'")
raise InvalidConfigurationError(
"Invalid JSON format: Missing API key identifier ('name' or 'id')"
)
if not private_key:
raise InvalidConfigurationError("Invalid JSON format: Missing 'private_key'")
raise InvalidConfigurationError("Invalid JSON format: Missing 'privateKey'")
cls.configure(
api_key_name,
private_key,
Expand Down
97 changes: 51 additions & 46 deletions cdp/cdp_api_client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import base64
import random
import time
from urllib.parse import urlparse

import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import ec, ed25519
from urllib3.util import Retry

from cdp import __version__
Expand Down Expand Up @@ -36,12 +37,15 @@ def __init__(
Args:
api_key (str): The API key for authentication.
private_key (str): The private key for authentication.
For ECDSA keys, this should be a PEM-encoded string.
For Ed25519 keys, this should be a base64-encoded string representing
either the raw 32-byte seed or a 64-byte key (private+public), in which
case only the first 32 bytes are used.
host (str, optional): The base URL for the API. Defaults to "https://api.cdp.coinbase.com/platform".
debugging (bool): Whether debugging is enabled.
max_network_retries (int): The maximum number of network retries. Defaults to 3.
source (str): Specifies whether the sdk is being used directly or if it's an Agentkit extension.
source (str): Specifies whether the SDK is being used directly or if it's an Agentkit extension.
source_version (str): The version of the source package.

"""
retry_strategy = self._get_retry_strategy(max_network_retries)
configuration = Configuration(host=host, retries=retry_strategy)
Expand All @@ -58,7 +62,6 @@ def api_key(self) -> str:

Returns:
str: The API key.

"""
return self._api_key

Expand All @@ -68,17 +71,15 @@ def private_key(self) -> str:

Returns:
str: The private key.

"""
return self._private_key

@property
def debugging(self) -> str:
def debugging(self) -> bool:
"""Whether debugging is enabled.

Returns:
bool: Whether debugging is enabled.

"""
return self._debugging

Expand All @@ -96,18 +97,15 @@ def call_api(
Args:
method: Method to call.
url: Path to method endpoint.
header_params: Header parameters to be
placed in the request header.
header_params: Header parameters to be placed in the request header.
body: Request body.
post_params (dict): Request post form parameters,
for `application/x-www-form-urlencoded`, `multipart/form-data`.
_request_timeout: timeout setting for this request.
post_params (dict): Request post form parameters.
_request_timeout: Timeout setting for this request.

Returns:
RESTResponse

"""
if self.debugging is True:
if self.debugging:
print(f"CDP API REQUEST: {method} {url}")

if header_params is None:
Expand All @@ -130,9 +128,8 @@ def response_deserialize(

Returns:
ApiResponse[ApiResponseT]

"""
if self.debugging is True:
if self.debugging:
print(f"CDP API RESPONSE: Status: {response_data.status}, Data: {response_data.data}")

try:
Expand All @@ -141,23 +138,15 @@ def response_deserialize(
raise ApiError.from_error(e) from None

def _apply_headers(self, url: str, method: str, header_params: dict[str, str]) -> None:
"""Apply authentication to the configuration.
"""Apply authentication headers.

Args:
url (str): The URL to authenticate.
method (str): The HTTP method to use.
header_params (dict[str, str]): The header parameters.

Returns:
None

"""
token = self._build_jwt(url, method)

# Add the JWT token to the headers
header_params["Authorization"] = f"Bearer {token}"

# Add additional custom headers
header_params["Content-Type"] = "application/json"
header_params["Correlation-Context"] = self._get_correlation_data()

Expand All @@ -170,19 +159,38 @@ def _build_jwt(self, url: str, method: str = "GET") -> str:

Returns:
str: The JWT for the given API endpoint URL.

"""
private_key_obj = None
key_data = self.private_key.encode()
# Business change: Support both ECDSA and Ed25519 keys.
try:
private_key = serialization.load_pem_private_key(
self.private_key.encode(), password=None
)
if not isinstance(private_key, ec.EllipticCurvePrivateKey):
raise InvalidAPIKeyFormatError("Invalid key type")
except Exception as e:
raise InvalidAPIKeyFormatError("Could not parse the private key") from e
# Try loading as a PEM-encoded key (typically for ECDSA keys).
private_key_obj = serialization.load_pem_private_key(key_data, password=None)
except Exception:
# If PEM loading fails, assume the key is provided as base64-encoded raw bytes (Ed25519).
try:
decoded_key = base64.b64decode(self.private_key)
if len(decoded_key) == 32:
private_key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(decoded_key)
elif len(decoded_key) == 64:
private_key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(decoded_key[:32])
else:
raise InvalidAPIKeyFormatError(
"Ed25519 private key must be 32 or 64 bytes after base64 decoding"
)
except Exception as e2:
raise InvalidAPIKeyFormatError("Could not parse the private key") from e2

# Determine signing algorithm based on the key type.
if isinstance(private_key_obj, ec.EllipticCurvePrivateKey):
alg = "ES256"
elif isinstance(private_key_obj, ed25519.Ed25519PrivateKey):
alg = "EdDSA"
else:
raise InvalidAPIKeyFormatError("Unsupported key type")

header = {
"alg": "ES256",
"alg": alg,
"kid": self.api_key,
"typ": "JWT",
"nonce": self._nonce(),
Expand All @@ -195,31 +203,29 @@ def _build_jwt(self, url: str, method: str = "GET") -> str:
"iss": "cdp",
"aud": ["cdp_service"],
"nbf": int(time.time()),
"exp": int(time.time()) + 60, # +1 minute
"exp": int(time.time()) + 60, # Token valid for 1 minute
"uris": [uri],
}

try:
return jwt.encode(claims, private_key, algorithm="ES256", headers=header)
return jwt.encode(claims, private_key_obj, algorithm=alg, headers=header)
except Exception as e:
print(f"Error during JWT signing: {e!s}")
raise InvalidAPIKeyFormatError("Could not sign the JWT") from e

def _nonce(self) -> str:
"""Generate a random nonce for the JWT.
"""Generate a random nonce.

Returns:
str: The nonce.

"""
return "".join(random.choices("0123456789", k=16))

def _get_correlation_data(self) -> str:
"""Return encoded correlation data including the SDK version, language, and source.
"""Return correlation data including SDK version, language, and source.

Returns:
str: The correlation data.

"""
data = {
"sdk_version": __version__,
Expand All @@ -230,18 +236,17 @@ def _get_correlation_data(self) -> str:
return ",".join(f"{key}={value}" for key, value in data.items())

def _get_retry_strategy(self, max_network_retries: int) -> Retry:
"""Return the retry strategy for the CDP API Client.
"""Return the retry strategy.

Args:
max_network_retries (int): The maximum number of network retries.

Returns:
Retry: The retry strategy.

"""
return Retry(
total=max_network_retries, # Number of total retries
status_forcelist=[500, 502, 503, 504], # Retry on HTTP status code 500
allowed_methods=["GET"], # Retry only on GET requests
backoff_factor=1, # Exponential backoff factor
total=max_network_retries,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET"],
backoff_factor=1,
)
Loading
Loading