From bc17820b6500fa5e4fcf02af9f0825887d3a4ae8 Mon Sep 17 00:00:00 2001 From: Andrea Lamparelli Date: Fri, 22 Nov 2024 15:32:25 +0100 Subject: [PATCH] Make oidc ssl cert verification configurable Signed-off-by: Andrea Lamparelli --- src/horreum/configs.py | 4 +++- src/horreum/horreum_client.py | 27 ++++++++++++++++--------- src/horreum/keycloak_access_provider.py | 5 +++-- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/horreum/configs.py b/src/horreum/configs.py index 58aeed5..44d1c7b 100644 --- a/src/horreum/configs.py +++ b/src/horreum/configs.py @@ -1,6 +1,6 @@ from dataclasses import dataclass -from typing import Optional from enum import Enum +from typing import Optional import httpx from kiota_abstractions.request_option import RequestOption @@ -27,3 +27,5 @@ class ClientConfiguration: options: Optional[dict[str, RequestOption]] = None # which authentication method to use auth_method: AuthMethod = AuthMethod.BEARER + # SSL cert verification against the oidc provider + auth_verify: bool = True diff --git a/src/horreum/horreum_client.py b/src/horreum/horreum_client.py index 9d08621..176a1b1 100644 --- a/src/horreum/horreum_client.py +++ b/src/horreum/horreum_client.py @@ -1,9 +1,9 @@ +import base64 +import logging from importlib.metadata import version from typing import Optional -import base64 import httpx -import logging from kiota_abstractions.authentication import AuthenticationProvider, ApiKeyAuthenticationProvider, KeyLocation from kiota_abstractions.authentication.access_token_provider import AccessTokenProvider from kiota_abstractions.authentication.anonymous_authentication_provider import AnonymousAuthenticationProvider @@ -21,16 +21,17 @@ logger = logging.getLogger(__name__) -async def setup_auth_provider(base_url: str, username: str, password: str) -> AccessTokenProvider: + +async def setup_auth_provider(base_url: str, username: str, password: str, http_client: httpx.AsyncClient = None, + verify: bool = True) -> AccessTokenProvider: # Use not authenticated client to fetch the auth mechanism auth_provider = AnonymousAuthenticationProvider() - req_adapter = HttpxRequestAdapter(auth_provider) + req_adapter = HttpxRequestAdapter(authentication_provider=auth_provider, http_client=http_client) req_adapter.base_url = base_url auth_client = HorreumRawClient(req_adapter) auth_config = await auth_client.api.config.keycloak.get() - # TODO: we could generalize using a generic OIDC client - return KeycloakAccessProvider(auth_config, username, password) + return KeycloakAccessProvider(auth_config, username, password, verify) class HorreumClient: @@ -49,6 +50,7 @@ def __init__(self, base_url: str, credentials: Optional[HorreumCredentials], self.__base_url = base_url self.__credentials = credentials self.__client_config = client_config + self.__auth_verify = client_config.auth_verify if client_config is not None else True if client_config and client_config.http_client and client_config.use_default_middlewares: self.__http_client = KiotaClientFactory.create_with_default_middleware(client=client_config.http_client, @@ -62,20 +64,25 @@ async def setup(self): """ if self.__credentials: - if self.__credentials.apikey is not None and (self.__client_config is None or self.__client_config.auth_method == AuthMethod.API_KEY): + if self.__credentials.apikey is not None and ( + self.__client_config is None or self.__client_config.auth_method == AuthMethod.API_KEY): # API key authentication - self.auth_provider = ApiKeyAuthenticationProvider(KeyLocation.Header, self.__credentials.apikey, "X-Horreum-API-Key") + self.auth_provider = ApiKeyAuthenticationProvider(KeyLocation.Header, self.__credentials.apikey, + "X-Horreum-API-Key") logger.info('Using API Key authentication') elif self.__credentials.username is not None: if self.__client_config is None or self.__client_config.auth_method == AuthMethod.BEARER: # Bearer token authentication - access_provider = await setup_auth_provider(self.__base_url, self.__credentials.username, self.__credentials.password) + access_provider = await setup_auth_provider(self.__base_url, self.__credentials.username, + self.__credentials.password, self.__http_client, + self.__auth_verify) self.auth_provider = BaseBearerTokenAuthenticationProvider(access_provider) logger.info('Using OIDC bearer token authentication') elif self.__client_config.auth_method == AuthMethod.BASIC: # Basic authentication - basic = "Basic " + base64.b64encode((self.__credentials.username + ":" + self.__credentials.password).encode()).decode() + basic = "Basic " + base64.b64encode( + (self.__credentials.username + ":" + self.__credentials.password).encode()).decode() self.auth_provider = ApiKeyAuthenticationProvider(KeyLocation.Header, basic, "Authentication") logger.info('Using Basic HTTP authentication') elif self.__credentials.password is not None: diff --git a/src/horreum/keycloak_access_provider.py b/src/horreum/keycloak_access_provider.py index 9cbef72..eed5349 100644 --- a/src/horreum/keycloak_access_provider.py +++ b/src/horreum/keycloak_access_provider.py @@ -12,7 +12,7 @@ class KeycloakAccessProvider(AccessTokenProvider): username: str password: str - def __init__(self, config: KeycloakConfig, username: str, password: str): + def __init__(self, config: KeycloakConfig, username: str, password: str, verify: bool = True): super() self.config = config self.username = username @@ -20,7 +20,8 @@ def __init__(self, config: KeycloakConfig, username: str, password: str): self.keycloak_openid = KeycloakOpenID( server_url=config.url, client_id=config.client_id, - realm_name=config.realm + realm_name=config.realm, + verify=verify ) async def get_authorization_token(self, uri: str, additional_authentication_context: Dict[str, Any] = {}) -> str: