Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The Python Evolve SDK contains everything necessary to communicate with a [Zepbe

# Requirements #

- Python 3.9 or later
- Python 3.10 or later

# Installation #

Expand Down
2 changes: 1 addition & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* None.

### Fixes
* None.
* Moved ZepbenTokenAuth to use python dataclasses instead of `zepben.ewb.dataclassy`, existing code should work as is.

### Notes
* None.
Expand Down
104 changes: 62 additions & 42 deletions src/zepben/ewb/auth/client/zepben_token_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
__all__ = ["ZepbenTokenFetcher", "create_token_fetcher", "get_token_fetcher", "create_token_fetcher_managed_identity"]

import warnings
from dataclasses import dataclass, Field, field, InitVar
from datetime import datetime
from typing import Optional, Union, Callable, Dict
from typing import Optional, Callable

import jwt
import requests
from dataclassy import dataclass
from requests import Response
from urllib3.exceptions import InsecureRequestWarning

from zepben.ewb.auth.common.auth_exception import AuthException
Expand All @@ -21,11 +22,15 @@
from zepben.ewb.auth.common.auth_provider_config import AuthProviderConfig, create_auth_provider_config, fetch_provider_details


def _fetch_token_generator(is_entraid: bool, use_identity: bool, identity_url: Optional[str] = None) -> Callable[
[Dict, Dict, str, bool, bool], requests.Response]:
def _fetch_token_generator(
is_entraid: bool,
use_identity: bool,
identity_url: Optional[str] = None
) -> Callable[[dict, dict, str, Optional[bool], Optional[bool]], Response]:

def post(
refresh_request_data: Dict,
token_request_data: Dict,
refresh_request_data: dict,
token_request_data: dict,
token_endpoint: str,
refresh: bool,
verify: bool
Expand All @@ -48,7 +53,14 @@ def post(
verify=verify
)

def _get_token_response(refresh_request_data: Dict, token_request_data: Dict, token_endpoint: str, refresh: bool, verify: bool) -> requests.Response:
def _get_token_response(
refresh_request_data: dict,
token_request_data: dict,
token_endpoint: str,
refresh: bool,
verify: bool
) -> requests.Response:

refresh = not is_entraid and refresh # At the moment Azure auth doesn't support refresh tokens. So we always force new tokens.

return post(
Expand All @@ -59,53 +71,59 @@ def _get_token_response(refresh_request_data: Dict, token_request_data: Dict, to
verify
)

def _get_token_response_from_identity(refresh_request_data: Dict, token_request_data: Dict, token_endpoint: str, refresh: bool = False,
verify: bool = False) -> requests.Response:
def _get_token_response_from_identity(
refresh_request_data: dict,
token_request_data: dict,
token_endpoint: str,
refresh: Optional[bool] = False,
verify: Optional[bool] = False
) -> requests.Response:

return requests.get(identity_url, headers={"Metadata": "true"}, verify=verify)

if use_identity:
if not identity_url:
raise ValueError("Misconfiguration dectected - if use_identity is true, identity_url must also be provided. This is a bug, contact Zepben.")
raise ValueError("Misconfiguration detected - if use_identity is true, identity_url must also be provided. This is a bug, contact Zepben.")
return _get_token_response_from_identity
else:
return _get_token_response


@dataclass
class ZepbenTokenFetcher(object):
@dataclass(init=True, repr=True, eq=True)
class ZepbenTokenFetcher:
"""
Fetches access tokens from an authentication provider using the OAuth 2.0 protocol.
"""

auth_method: AuthMethod = AuthMethod.OAUTH
""" Deprecated. Kept for backwards compatibility, but this is now unused. """
:param audience: Audience to use when requesting tokens
:param token_endpoint: The domain of the token issuer.
:param token_request_data: Data to pass in token requests.
:param refresh_request_data: Data to pass in refresh token requests.
:param verify: Passed through to requests.post(). When this is a boolean, it determines whether to verify the HTTPS
certificate of the OAUTH service or not. When this is a string, it is used as the filename of the certificate
truststore to use when verifying the OAUTH service.
:param auth_method: Deprecated. Kept for backwards compatibility, but this is now unused.
"""

audience: str
""" Audience to use when requesting tokens """

token_endpoint: str
""" The domain of the token issuer. """

token_request_data = {}
""" Data to pass in token requests. """

refresh_request_data = {}
""" Data to pass in refresh token requests. """
issuer: Optional[str] = None
token_endpoint: Optional[str] = None
token_request_data: Optional[dict] = field(default_factory=dict)
refresh_request_data: Optional[dict] = field(default_factory=dict)
verify: Optional[bool | str] = None
auth_method: Optional[AuthMethod] = None

verify: Union[bool, str] = True
"""
Passed through to requests.post(). When this is a boolean, it determines whether or not to verify the HTTPS certificate of the OAUTH service.
When this is a string, it is used as the filename of the certificate truststore to use when verifying the OAUTH service.
"""
_request_token: InitVar[Callable[[dict, dict, str, Optional[bool], Optional[bool]], requests.Response]] = None

_request_token: Callable[[Dict, Dict, str, bool, bool], requests.Response] = _fetch_token_generator(False, False)
_access_token: Optional[str] = None
_refresh_token: Optional[str] = None
_token_expiry: Optional[datetime] = datetime.min
token_type: Optional[str] = None

_access_token = None
_refresh_token = None
_token_expiry = datetime.min
_token_type = None
def __post_init__(self, _request_token):
if _request_token is None:
_request_token = _fetch_token_generator(False, False)
self._request_token = _request_token

def __init__(self):
self.token_request_data["audience"] = self.audience
self.refresh_request_data["audience"] = self.audience

Expand Down Expand Up @@ -134,7 +152,7 @@ def fetch_token(self) -> str:

return f"{self._token_type} {self._access_token}"

def _fetch_token(self, refresh: bool = False):
def _fetch_token(self, refresh: Optional[bool] = False):
if refresh:
self.refresh_request_data["refresh_token"] = self._refresh_token

Expand Down Expand Up @@ -174,11 +192,11 @@ def _fetch_token(self, refresh: bool = False):

def create_token_fetcher(
conf_address: str,
verify_conf: Union[bool, str] = True,
verify_auth: Union[bool, str] = True,
auth_type_field: str = "authType",
audience_field: str = "audience",
issuer_field: str = "issuer",
verify_conf: Optional[bool | str] = True,
verify_auth: Optional[bool | str] = True,
auth_type_field: Optional[str] = "authType",
audience_field: Optional[str] = "audience",
issuer_field: Optional[str] = "issuer",
) -> Optional[ZepbenTokenFetcher]:
"""
Helper method to fetch auth related configuration from `conf_address` and create a :class:`ZepbenTokenFetcher`
Expand All @@ -194,6 +212,7 @@ def create_token_fetcher(

:returns: A :class:`ZepbenTokenFetcher` if the server reported authentication was configured, otherwise None.
"""

with warnings.catch_warnings():
if not verify_conf:
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
Expand Down Expand Up @@ -264,6 +283,7 @@ def create_token_fetcher_managed_identity(identity_url: str, verify_auth: bool)
"http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=5ffcfee6-34cd-4c5c-bb7e-c5261d739341"
:param verify_auth: Whether to verify certificates for the identity_url. Only applies for https URLs.
"""

return ZepbenTokenFetcher(
audience="",
issuer="",
Expand Down