From 929befe2f09897c5b0e9fbab98b3f429977d38d5 Mon Sep 17 00:00:00 2001 From: Max Chesterfield Date: Fri, 29 Aug 2025 12:02:15 +1000 Subject: [PATCH 1/4] missed readme update Signed-off-by: Max Chesterfield --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index a752a5d1..dfdcf193 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ The Python Evolve SDK contains everything necessary to communicate with a [Zepbe # Requirements # -- Python 3.9 or later +- Python 3.10 or later # Installation # From 360970c10647b37e127d3a049eea86c638208af8 Mon Sep 17 00:00:00 2001 From: Max Chesterfield Date: Fri, 29 Aug 2025 12:02:37 +1000 Subject: [PATCH 2/4] refactor to remove dataclassy, also fixed type hints Signed-off-by: Max Chesterfield --- .../ewb/auth/client/zepben_token_fetcher.py | 120 +++++++++++------- 1 file changed, 76 insertions(+), 44 deletions(-) diff --git a/src/zepben/ewb/auth/client/zepben_token_fetcher.py b/src/zepben/ewb/auth/client/zepben_token_fetcher.py index 821bedce..6025e54e 100644 --- a/src/zepben/ewb/auth/client/zepben_token_fetcher.py +++ b/src/zepben/ewb/auth/client/zepben_token_fetcher.py @@ -7,12 +7,13 @@ __all__ = ["ZepbenTokenFetcher", "create_token_fetcher", "get_token_fetcher", "create_token_fetcher_managed_identity"] import warnings +from dataclasses import dataclass from datetime import datetime -from typing import Optional, Union, Callable, Dict +from typing import Optional, Callable import jwt import requests -from dataclassy import dataclass +from requests import Response from urllib3.exceptions import InsecureRequestWarning from zepben.ewb.auth.common.auth_exception import AuthException @@ -21,11 +22,15 @@ from zepben.ewb.auth.common.auth_provider_config import AuthProviderConfig, create_auth_provider_config, fetch_provider_details -def _fetch_token_generator(is_entraid: bool, use_identity: bool, identity_url: Optional[str] = None) -> Callable[ - [Dict, Dict, str, bool, bool], requests.Response]: +def _fetch_token_generator( + is_entraid: bool, + use_identity: bool, + identity_url: Optional[str] = None +) -> Callable[[dict, dict, str, Optional[bool], Optional[bool]], Response]: + def post( - refresh_request_data: Dict, - token_request_data: Dict, + refresh_request_data: dict, + token_request_data: dict, token_endpoint: str, refresh: bool, verify: bool @@ -48,7 +53,14 @@ def post( verify=verify ) - def _get_token_response(refresh_request_data: Dict, token_request_data: Dict, token_endpoint: str, refresh: bool, verify: bool) -> requests.Response: + def _get_token_response( + refresh_request_data: dict, + token_request_data: dict, + token_endpoint: str, + refresh: bool, + verify: bool + ) -> requests.Response: + refresh = not is_entraid and refresh # At the moment Azure auth doesn't support refresh tokens. So we always force new tokens. return post( @@ -59,53 +71,71 @@ def _get_token_response(refresh_request_data: Dict, token_request_data: Dict, to verify ) - def _get_token_response_from_identity(refresh_request_data: Dict, token_request_data: Dict, token_endpoint: str, refresh: bool = False, - verify: bool = False) -> requests.Response: + def _get_token_response_from_identity( + refresh_request_data: dict, + token_request_data: dict, + token_endpoint: str, + refresh: Optional[bool] = False, + verify: Optional[bool] = False + ) -> requests.Response: + return requests.get(identity_url, headers={"Metadata": "true"}, verify=verify) if use_identity: if not identity_url: - raise ValueError("Misconfiguration dectected - if use_identity is true, identity_url must also be provided. This is a bug, contact Zepben.") + raise ValueError("Misconfiguration detected - if use_identity is true, identity_url must also be provided. This is a bug, contact Zepben.") return _get_token_response_from_identity else: return _get_token_response @dataclass -class ZepbenTokenFetcher(object): +class ZepbenTokenFetcher: """ Fetches access tokens from an authentication provider using the OAuth 2.0 protocol. - """ - - auth_method: AuthMethod = AuthMethod.OAUTH - """ Deprecated. Kept for backwards compatibility, but this is now unused. """ - audience: str - """ Audience to use when requesting tokens """ - - token_endpoint: str - """ The domain of the token issuer. """ - - token_request_data = {} - """ Data to pass in token requests. """ - - refresh_request_data = {} - """ Data to pass in refresh token requests. """ - - verify: Union[bool, str] = True - """ - Passed through to requests.post(). When this is a boolean, it determines whether or not to verify the HTTPS certificate of the OAUTH service. - When this is a string, it is used as the filename of the certificate truststore to use when verifying the OAUTH service. + :param audience: Audience to use when requesting tokens + :param token_endpoint: The domain of the token issuer. + :param token_request_data: Data to pass in token requests. + :param refresh_request_data: Data to pass in refresh token requests. + :param verify: Passed through to requests.post(). When this is a boolean, it determines whether to verify the HTTPS + certificate of the OAUTH service or not. When this is a string, it is used as the filename of the certificate + truststore to use when verifying the OAUTH service. + :param auth_method: Deprecated. Kept for backwards compatibility, but this is now unused. """ - _request_token: Callable[[Dict, Dict, str, bool, bool], requests.Response] = _fetch_token_generator(False, False) - - _access_token = None - _refresh_token = None - _token_expiry = datetime.min - _token_type = None + def __init__( + self, + audience: str, + issuer: Optional[str] = None, # TODO: document in params + token_endpoint: Optional[str] = None, + token_request_data: Optional[dict] = None, + refresh_request_data: Optional[dict] = None, + verify: Optional[bool | str] = True, + auth_method: Optional[AuthMethod] = AuthMethod.OAUTH, + + _request_token: Optional[ + Callable[[dict, dict, str, Optional[bool], Optional[bool]], requests.Response] + ] = _fetch_token_generator(False, False), + + _access_token=None, + _refresh_token=None, + _token_expiry=datetime.min, + _token_type=None, + ): + self.audience: str = audience + self.issuer: str = issuer + self.token_endpoint: str = token_endpoint + self.token_request_data: dict = token_request_data if token_request_data is not None else {} + self.refresh_request_data: dict = refresh_request_data if refresh_request_data is not None else {} + self.verify: bool | str = verify + self.auth_method: AuthMethod = auth_method + self._request_token = _request_token + self._access_token = _access_token + self._refresh_token = _refresh_token + self._token_expiry = _token_expiry + self.token_type = _token_type - def __init__(self): self.token_request_data["audience"] = self.audience self.refresh_request_data["audience"] = self.audience @@ -134,7 +164,7 @@ def fetch_token(self) -> str: return f"{self._token_type} {self._access_token}" - def _fetch_token(self, refresh: bool = False): + def _fetch_token(self, refresh: Optional[bool] = False): if refresh: self.refresh_request_data["refresh_token"] = self._refresh_token @@ -174,11 +204,11 @@ def _fetch_token(self, refresh: bool = False): def create_token_fetcher( conf_address: str, - verify_conf: Union[bool, str] = True, - verify_auth: Union[bool, str] = True, - auth_type_field: str = "authType", - audience_field: str = "audience", - issuer_field: str = "issuer", + verify_conf: Optional[bool | str] = True, + verify_auth: Optional[bool | str] = True, + auth_type_field: Optional[str] = "authType", + audience_field: Optional[str] = "audience", + issuer_field: Optional[str] = "issuer", ) -> Optional[ZepbenTokenFetcher]: """ Helper method to fetch auth related configuration from `conf_address` and create a :class:`ZepbenTokenFetcher` @@ -194,6 +224,7 @@ def create_token_fetcher( :returns: A :class:`ZepbenTokenFetcher` if the server reported authentication was configured, otherwise None. """ + with warnings.catch_warnings(): if not verify_conf: warnings.filterwarnings("ignore", category=InsecureRequestWarning) @@ -264,6 +295,7 @@ def create_token_fetcher_managed_identity(identity_url: str, verify_auth: bool) "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=5ffcfee6-34cd-4c5c-bb7e-c5261d739341" :param verify_auth: Whether to verify certificates for the identity_url. Only applies for https URLs. """ + return ZepbenTokenFetcher( audience="", issuer="", From af51e0321fc9392d6b8fe96e498c5b75215532e0 Mon Sep 17 00:00:00 2001 From: Max Chesterfield Date: Fri, 29 Aug 2025 12:29:37 +1000 Subject: [PATCH 3/4] should do dataclasses properly though Signed-off-by: Max Chesterfield --- .../ewb/auth/client/zepben_token_fetcher.py | 52 +++++++------------ 1 file changed, 20 insertions(+), 32 deletions(-) diff --git a/src/zepben/ewb/auth/client/zepben_token_fetcher.py b/src/zepben/ewb/auth/client/zepben_token_fetcher.py index 6025e54e..9c004ccc 100644 --- a/src/zepben/ewb/auth/client/zepben_token_fetcher.py +++ b/src/zepben/ewb/auth/client/zepben_token_fetcher.py @@ -7,7 +7,7 @@ __all__ = ["ZepbenTokenFetcher", "create_token_fetcher", "get_token_fetcher", "create_token_fetcher_managed_identity"] import warnings -from dataclasses import dataclass +from dataclasses import dataclass, Field, field, InitVar from datetime import datetime from typing import Optional, Callable @@ -89,7 +89,7 @@ def _get_token_response_from_identity( return _get_token_response -@dataclass +@dataclass(init=True, repr=True, eq=True) class ZepbenTokenFetcher: """ Fetches access tokens from an authentication provider using the OAuth 2.0 protocol. @@ -104,37 +104,25 @@ class ZepbenTokenFetcher: :param auth_method: Deprecated. Kept for backwards compatibility, but this is now unused. """ - def __init__( - self, - audience: str, - issuer: Optional[str] = None, # TODO: document in params - token_endpoint: Optional[str] = None, - token_request_data: Optional[dict] = None, - refresh_request_data: Optional[dict] = None, - verify: Optional[bool | str] = True, - auth_method: Optional[AuthMethod] = AuthMethod.OAUTH, - - _request_token: Optional[ - Callable[[dict, dict, str, Optional[bool], Optional[bool]], requests.Response] - ] = _fetch_token_generator(False, False), - - _access_token=None, - _refresh_token=None, - _token_expiry=datetime.min, - _token_type=None, - ): - self.audience: str = audience - self.issuer: str = issuer - self.token_endpoint: str = token_endpoint - self.token_request_data: dict = token_request_data if token_request_data is not None else {} - self.refresh_request_data: dict = refresh_request_data if refresh_request_data is not None else {} - self.verify: bool | str = verify - self.auth_method: AuthMethod = auth_method + audience: str + issuer: Optional[str] = None + token_endpoint: Optional[str] = None + token_request_data: Optional[dict] = field(default_factory=dict) + refresh_request_data: Optional[dict] = field(default_factory=dict) + verify: Optional[bool | str] = None + auth_method: Optional[AuthMethod] = None + + _request_token: InitVar[Callable[[dict, dict, str, Optional[bool], Optional[bool]], requests.Response]] = None + + _access_token: Optional[str] = None + _refresh_token: Optional[str] = None + _token_expiry: Optional[datetime] = datetime.min + token_type: Optional[str] = None + + def __post_init__(self, _request_token): + if _request_token is None: + _request_token = _fetch_token_generator(False, False) self._request_token = _request_token - self._access_token = _access_token - self._refresh_token = _refresh_token - self._token_expiry = _token_expiry - self.token_type = _token_type self.token_request_data["audience"] = self.audience self.refresh_request_data["audience"] = self.audience From 5f6489f970d693a14d6b8665a3a1f1bf9dd3d2f3 Mon Sep 17 00:00:00 2001 From: Max Chesterfield Date: Fri, 29 Aug 2025 12:44:04 +1000 Subject: [PATCH 4/4] changelog Signed-off-by: Max Chesterfield --- changelog.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.md b/changelog.md index e5980342..9c7ee4b7 100644 --- a/changelog.md +++ b/changelog.md @@ -10,7 +10,7 @@ * None. ### Fixes -* None. +* Moved ZepbenTokenAuth to use python dataclasses instead of `zepben.ewb.dataclassy`, existing code should work as is. ### Notes * None.