diff --git a/README.md b/README.md index a752a5d1..dfdcf193 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ The Python Evolve SDK contains everything necessary to communicate with a [Zepbe # Requirements # -- Python 3.9 or later +- Python 3.10 or later # Installation # diff --git a/changelog.md b/changelog.md index e5980342..9c7ee4b7 100644 --- a/changelog.md +++ b/changelog.md @@ -10,7 +10,7 @@ * None. ### Fixes -* None. +* Moved ZepbenTokenAuth to use python dataclasses instead of `zepben.ewb.dataclassy`, existing code should work as is. ### Notes * None. diff --git a/src/zepben/ewb/auth/client/zepben_token_fetcher.py b/src/zepben/ewb/auth/client/zepben_token_fetcher.py index 821bedce..9c004ccc 100644 --- a/src/zepben/ewb/auth/client/zepben_token_fetcher.py +++ b/src/zepben/ewb/auth/client/zepben_token_fetcher.py @@ -7,12 +7,13 @@ __all__ = ["ZepbenTokenFetcher", "create_token_fetcher", "get_token_fetcher", "create_token_fetcher_managed_identity"] import warnings +from dataclasses import dataclass, Field, field, InitVar from datetime import datetime -from typing import Optional, Union, Callable, Dict +from typing import Optional, Callable import jwt import requests -from dataclassy import dataclass +from requests import Response from urllib3.exceptions import InsecureRequestWarning from zepben.ewb.auth.common.auth_exception import AuthException @@ -21,11 +22,15 @@ from zepben.ewb.auth.common.auth_provider_config import AuthProviderConfig, create_auth_provider_config, fetch_provider_details -def _fetch_token_generator(is_entraid: bool, use_identity: bool, identity_url: Optional[str] = None) -> Callable[ - [Dict, Dict, str, bool, bool], requests.Response]: +def _fetch_token_generator( + is_entraid: bool, + use_identity: bool, + identity_url: Optional[str] = None +) -> Callable[[dict, dict, str, Optional[bool], Optional[bool]], Response]: + def post( - refresh_request_data: Dict, - token_request_data: Dict, + refresh_request_data: dict, + token_request_data: dict, token_endpoint: str, refresh: bool, verify: bool @@ -48,7 +53,14 @@ def post( verify=verify ) - def _get_token_response(refresh_request_data: Dict, token_request_data: Dict, token_endpoint: str, refresh: bool, verify: bool) -> requests.Response: + def _get_token_response( + refresh_request_data: dict, + token_request_data: dict, + token_endpoint: str, + refresh: bool, + verify: bool + ) -> requests.Response: + refresh = not is_entraid and refresh # At the moment Azure auth doesn't support refresh tokens. So we always force new tokens. return post( @@ -59,53 +71,59 @@ def _get_token_response(refresh_request_data: Dict, token_request_data: Dict, to verify ) - def _get_token_response_from_identity(refresh_request_data: Dict, token_request_data: Dict, token_endpoint: str, refresh: bool = False, - verify: bool = False) -> requests.Response: + def _get_token_response_from_identity( + refresh_request_data: dict, + token_request_data: dict, + token_endpoint: str, + refresh: Optional[bool] = False, + verify: Optional[bool] = False + ) -> requests.Response: + return requests.get(identity_url, headers={"Metadata": "true"}, verify=verify) if use_identity: if not identity_url: - raise ValueError("Misconfiguration dectected - if use_identity is true, identity_url must also be provided. This is a bug, contact Zepben.") + raise ValueError("Misconfiguration detected - if use_identity is true, identity_url must also be provided. This is a bug, contact Zepben.") return _get_token_response_from_identity else: return _get_token_response -@dataclass -class ZepbenTokenFetcher(object): +@dataclass(init=True, repr=True, eq=True) +class ZepbenTokenFetcher: """ Fetches access tokens from an authentication provider using the OAuth 2.0 protocol. - """ - auth_method: AuthMethod = AuthMethod.OAUTH - """ Deprecated. Kept for backwards compatibility, but this is now unused. """ + :param audience: Audience to use when requesting tokens + :param token_endpoint: The domain of the token issuer. + :param token_request_data: Data to pass in token requests. + :param refresh_request_data: Data to pass in refresh token requests. + :param verify: Passed through to requests.post(). When this is a boolean, it determines whether to verify the HTTPS + certificate of the OAUTH service or not. When this is a string, it is used as the filename of the certificate + truststore to use when verifying the OAUTH service. + :param auth_method: Deprecated. Kept for backwards compatibility, but this is now unused. + """ audience: str - """ Audience to use when requesting tokens """ - - token_endpoint: str - """ The domain of the token issuer. """ - - token_request_data = {} - """ Data to pass in token requests. """ - - refresh_request_data = {} - """ Data to pass in refresh token requests. """ + issuer: Optional[str] = None + token_endpoint: Optional[str] = None + token_request_data: Optional[dict] = field(default_factory=dict) + refresh_request_data: Optional[dict] = field(default_factory=dict) + verify: Optional[bool | str] = None + auth_method: Optional[AuthMethod] = None - verify: Union[bool, str] = True - """ - Passed through to requests.post(). When this is a boolean, it determines whether or not to verify the HTTPS certificate of the OAUTH service. - When this is a string, it is used as the filename of the certificate truststore to use when verifying the OAUTH service. - """ + _request_token: InitVar[Callable[[dict, dict, str, Optional[bool], Optional[bool]], requests.Response]] = None - _request_token: Callable[[Dict, Dict, str, bool, bool], requests.Response] = _fetch_token_generator(False, False) + _access_token: Optional[str] = None + _refresh_token: Optional[str] = None + _token_expiry: Optional[datetime] = datetime.min + token_type: Optional[str] = None - _access_token = None - _refresh_token = None - _token_expiry = datetime.min - _token_type = None + def __post_init__(self, _request_token): + if _request_token is None: + _request_token = _fetch_token_generator(False, False) + self._request_token = _request_token - def __init__(self): self.token_request_data["audience"] = self.audience self.refresh_request_data["audience"] = self.audience @@ -134,7 +152,7 @@ def fetch_token(self) -> str: return f"{self._token_type} {self._access_token}" - def _fetch_token(self, refresh: bool = False): + def _fetch_token(self, refresh: Optional[bool] = False): if refresh: self.refresh_request_data["refresh_token"] = self._refresh_token @@ -174,11 +192,11 @@ def _fetch_token(self, refresh: bool = False): def create_token_fetcher( conf_address: str, - verify_conf: Union[bool, str] = True, - verify_auth: Union[bool, str] = True, - auth_type_field: str = "authType", - audience_field: str = "audience", - issuer_field: str = "issuer", + verify_conf: Optional[bool | str] = True, + verify_auth: Optional[bool | str] = True, + auth_type_field: Optional[str] = "authType", + audience_field: Optional[str] = "audience", + issuer_field: Optional[str] = "issuer", ) -> Optional[ZepbenTokenFetcher]: """ Helper method to fetch auth related configuration from `conf_address` and create a :class:`ZepbenTokenFetcher` @@ -194,6 +212,7 @@ def create_token_fetcher( :returns: A :class:`ZepbenTokenFetcher` if the server reported authentication was configured, otherwise None. """ + with warnings.catch_warnings(): if not verify_conf: warnings.filterwarnings("ignore", category=InsecureRequestWarning) @@ -264,6 +283,7 @@ def create_token_fetcher_managed_identity(identity_url: str, verify_auth: bool) "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=5ffcfee6-34cd-4c5c-bb7e-c5261d739341" :param verify_auth: Whether to verify certificates for the identity_url. Only applies for https URLs. """ + return ZepbenTokenFetcher( audience="", issuer="",