From e2725c32bf9488aa50d3f0ca00e835125aeabb2e Mon Sep 17 00:00:00 2001 From: Di Mei Date: Sat, 22 Feb 2025 16:59:44 -0500 Subject: [PATCH] support Edwards key --- cdp/cdp.py | 2 +- cdp/cdp_api_client.py | 43 ++++++++++++++++++++++++++++++++----------- cdp/wallet.py | 40 ++++++++++++++++++++++++++++++++-------- 3 files changed, 65 insertions(+), 20 deletions(-) diff --git a/cdp/cdp.py b/cdp/cdp.py index 84c2416..f7a62d9 100644 --- a/cdp/cdp.py +++ b/cdp/cdp.py @@ -118,7 +118,7 @@ def configure_from_json( """ with open(os.path.expanduser(file_path)) as file: data = json.load(file) - api_key_name = data.get("name") + api_key_name = data.get("name") or data.get("id") private_key = data.get("privateKey") if not api_key_name: raise InvalidConfigurationError("Invalid JSON format: Missing 'api_key_name'") diff --git a/cdp/cdp_api_client.py b/cdp/cdp_api_client.py index 15a7d2f..fd4aa05 100644 --- a/cdp/cdp_api_client.py +++ b/cdp/cdp_api_client.py @@ -1,10 +1,11 @@ +import base64 import random import time from urllib.parse import urlparse import jwt from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import ec, ed25519 from urllib3.util import Retry from cdp import __version__ @@ -172,17 +173,37 @@ def _build_jwt(self, url: str, method: str = "GET") -> str: str: The JWT for the given API endpoint URL. """ + private_key_obj = None + key_data = self.private_key.encode() + # Business change: Support both ECDSA and Ed25519 keys. try: - private_key = serialization.load_pem_private_key( - self.private_key.encode(), password=None - ) - if not isinstance(private_key, ec.EllipticCurvePrivateKey): - raise InvalidAPIKeyFormatError("Invalid key type") - except Exception as e: - raise InvalidAPIKeyFormatError("Could not parse the private key") from e + # Try loading as a PEM-encoded key (typically for ECDSA keys). + private_key_obj = serialization.load_pem_private_key(key_data, password=None) + except Exception: + # If PEM loading fails, assume the key is provided as base64-encoded raw bytes (Ed25519). + try: + decoded_key = base64.b64decode(self.private_key) + if len(decoded_key) == 32: + private_key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(decoded_key) + elif len(decoded_key) == 64: + private_key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(decoded_key[:32]) + else: + raise InvalidAPIKeyFormatError( + "Ed25519 private key must be 32 or 64 bytes after base64 decoding" + ) + except Exception as e2: + raise InvalidAPIKeyFormatError("Could not parse the private key") from e2 + + # Determine signing algorithm based on the key type. + if isinstance(private_key_obj, ec.EllipticCurvePrivateKey): + alg = "ES256" + elif isinstance(private_key_obj, ed25519.Ed25519PrivateKey): + alg = "EdDSA" + else: + raise InvalidAPIKeyFormatError("Unsupported key type") header = { - "alg": "ES256", + "alg": alg, "kid": self.api_key, "typ": "JWT", "nonce": self._nonce(), @@ -195,12 +216,12 @@ def _build_jwt(self, url: str, method: str = "GET") -> str: "iss": "cdp", "aud": ["cdp_service"], "nbf": int(time.time()), - "exp": int(time.time()) + 60, # +1 minute + "exp": int(time.time()) + 60, # Token valid for 1 minute "uris": [uri], } try: - return jwt.encode(claims, private_key, algorithm="ES256", headers=header) + return jwt.encode(claims, private_key_obj, algorithm=alg, headers=header) except Exception as e: print(f"Error during JWT signing: {e!s}") raise InvalidAPIKeyFormatError("Could not sign the JWT") from e diff --git a/cdp/wallet.py b/cdp/wallet.py index 5707197..107d10e 100644 --- a/cdp/wallet.py +++ b/cdp/wallet.py @@ -1,3 +1,4 @@ +import base64 import builtins import hashlib import json @@ -12,7 +13,7 @@ from bip_utils import Bip32Slip10Secp256k1, Bip39MnemonicValidator, Bip39SeedGenerator from Crypto.Cipher import AES from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric import ec, ed25519 from eth_account import Account from cdp.address import Address @@ -692,13 +693,36 @@ def _encryption_key(self) -> bytes: bytes: The generated encryption key. """ - private_key = serialization.load_pem_private_key(Cdp.private_key.encode(), password=None) - - public_key = private_key.public_key() - - shared_secret = private_key.exchange(ec.ECDH(), public_key) - - return hashlib.sha256(shared_secret).digest() + try: + key_obj = serialization.load_pem_private_key(Cdp.private_key.encode(), password=None) + except Exception: + # If PEM loading fails, assume the key is provided as a base64-encoded Ed25519 key. + try: + decoded = base64.b64decode(Cdp.private_key) + if len(decoded) == 32: + key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(decoded) + elif len(decoded) == 64: + key_obj = ed25519.Ed25519PrivateKey.from_private_bytes(decoded[:32]) + else: + raise ValueError("Invalid Ed25519 key length") + except Exception as e2: + raise ValueError("Could not parse the private key") from e2 + + # For ECDSA keys, perform an ECDH exchange with its own public key. + if isinstance(key_obj, ec.EllipticCurvePrivateKey): + public_key = key_obj.public_key() + shared_secret = key_obj.exchange(ec.ECDH(), public_key) + return hashlib.sha256(shared_secret).digest() + # For Ed25519 keys, derive the encryption key by hashing the raw private key bytes. + elif isinstance(key_obj, ed25519.Ed25519PrivateKey): + raw_bytes = key_obj.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ) + return hashlib.sha256(raw_bytes).digest() + else: + raise ValueError("Unsupported key type for encryption key derivation") def _existing_seeds(self, file_path: str) -> dict[str, Any]: """Load existing seeds from a file.