diff --git a/tests/unit/authentication/test_api_key_token.py b/tests/unit/authentication/test_api_key_token.py index 988494881..b16be1959 100644 --- a/tests/unit/authentication/test_api_key_token.py +++ b/tests/unit/authentication/test_api_key_token.py @@ -13,7 +13,14 @@ @pytest.fixture def default_api_key_token_configuration() -> APIKeyTokenConfiguration: - """Default APIKeyTokenConfiguration for testing.""" + """Default APIKeyTokenConfiguration for testing. + + Provide a default APIKeyTokenConfiguration for tests. + + Returns: + APIKeyTokenConfiguration: configuration with `api_key` set to + `SecretStr("some-test-api-key")`. + """ return APIKeyTokenConfiguration(api_key=SecretStr("some-test-api-key")) diff --git a/tests/unit/authentication/test_jwk_token.py b/tests/unit/authentication/test_jwk_token.py index b2b1cd2ff..48763d952 100644 --- a/tests/unit/authentication/test_jwk_token.py +++ b/tests/unit/authentication/test_jwk_token.py @@ -21,13 +21,31 @@ @pytest.fixture def token_header(single_key_set: list[dict[str, Any]]) -> dict[str, Any]: - """A sample token header.""" + """A sample token header. + + Create a JWT header using RS256 and the first key's `kid` from the provided key set. + + Parameters: + single_key_set (list[dict]): List of signing key dictionaries; the + first element must contain a `"kid"`. + + Returns: + dict: JWT header with keys `"alg": "RS256"`, `"typ": "JWT"`, and + `"kid"` set to the first key's `kid`. + """ return {"alg": "RS256", "typ": "JWT", "kid": single_key_set[0]["kid"]} @pytest.fixture def token_payload() -> dict[str, Any]: - """A sample token payload with the default user_id and username claims.""" + """A sample token payload with the default user_id and username claims. + + Create a sample JWT payload containing the test user claims and timing claims. + + Returns: + dict: A mapping with keys "user_id", "username", "exp", and "iat"; + "exp" and "iat" are UNIX timestamps (seconds since epoch). + """ return { "user_id": TEST_USER_ID, "username": TEST_USER_NAME, @@ -54,7 +72,14 @@ def single_key_set() -> list[dict[str, Any]]: @pytest.fixture def another_single_key_set() -> list[dict[str, Any]]: - """Same as single_key_set, but generates a different key pair by being its own fixture.""" + """Same as single_key_set, but generates a different key pair by being its own fixture. + + Create a single-key JWK set using a newly generated RSA key. + + Returns: + list[dict[str, Any]]: A list containing one key dict with keys + `private_key`, `public_key`, and `kid`. + """ return [make_key()] @@ -64,7 +89,19 @@ def valid_token( token_header: dict[str, Any], token_payload: dict[str, Any], ) -> str: - """A token that is valid and signed with the signing keys.""" + """A token that is valid and signed with the signing keys. + + Create a JWT signed with the first private key from a single-key JWK set using RS256. + + Parameters: + single_key_set (list[dict[str, Any]]): List of key dicts where the + first entry must contain a 'private_key' used to sign the token. + token_header (dict[str, Any]): JWT header values to include in the token. + token_payload (dict[str, Any]): JWT claims to include in the token. + + Returns: + str: The compact serialized JWT signed with the provided private key. + """ jwt_instance = JsonWebToken(algorithms=["RS256"]) return jwt_instance.encode( token_header, token_payload, single_key_set[0]["private_key"] @@ -82,7 +119,30 @@ def clear_jwk_cache() -> Generator: def make_signing_server( mocker: MockerFixture, key_set: list[dict[str, Any]], algorithms: list[str] ) -> Any: - """A fake server to serve our signing keys as JWKs.""" + """A fake server to serve our signing keys as JWKs. + + Create and patch a mocked aiohttp.ClientSession that serves a JWKS response + derived from the provided key set. + + Parameters: + mocker (pytest.MockerFixture): Pytest mocker used to patch aiohttp.ClientSession. + key_set (list[dict[str, Any]]): List of signing key dicts; each item + must include a `private_key` with an `as_dict(private=False)` method + and a `kid` value. + algorithms (list[str]): List of JWK `alg` values to assign to each + corresponding key in `key_set`. + + Returns: + Any: The mocked ClientSession class (the value assigned to the patched + `aiohttp.ClientSession`). The mock is configured so that: + - Instantiating the session returns an async context manager. + - Calling `await session.get(...)` returns an async context manager + whose entered value is a response object. + - The response's `json()` returns `{"keys": keys}` where each key is + the public JWK derived from `private_key.as_dict(private=False)` + extended with `kid` and `alg`. + - `response.raise_for_status()` is a no-op. + """ mock_session_class = mocker.patch("aiohttp.ClientSession") mock_response = mocker.AsyncMock() @@ -122,13 +182,34 @@ def make_signing_server( def mocked_signing_keys_server( mocker: MockerFixture, single_key_set: list[dict[str, Any]] ) -> None: - """Single-key signing server.""" + """Single-key signing server. + + Create and register a mocked signing keys HTTP server that serves a single RS256 JWK set. + + Parameters: + mocker (pytest_mock.MockerFixture): Pytest-mock fixture used to patch + aiohttp.ClientSession and related network calls. + single_key_set (list[dict[str, Any]]): A list containing one JWK dict + (public key representation) that will be returned by the mocked JWKS + endpoint. + """ return make_signing_server(mocker, single_key_set, ["RS256"]) @pytest.fixture def default_jwk_configuration() -> JwkConfiguration: - """Default JwkConfiguration for testing.""" + """Default JwkConfiguration for testing. + + Create a default JwkConfiguration preconfigured for tests. + + The returned configuration uses a mocked JWKS URL and a JwtConfiguration + that maps the user identifier to the `user_id` claim and the username to + the `username` claim. + + Returns: + JwkConfiguration: Configuration with a mocked JWKS URL and default + claim mappings (`user_id` and `username`). + """ return JwkConfiguration( url=AnyHttpUrl("https://this#isgonnabemocked.com/jwks.json"), jwt_configuration=JwtConfiguration( @@ -139,7 +220,17 @@ def default_jwk_configuration() -> JwkConfiguration: def dummy_request(token: str) -> Request: - """Generate a dummy request with a given token.""" + """Generate a dummy request with a given token. + + Create a FastAPI Request with an Authorization Bearer header containing the provided token. + + Parameters: + token (str): Token string to place after the "Bearer " prefix in the Authorization header. + + Returns: + request (Request): FastAPI Request object with the Authorization header + set to "Bearer ". + """ return Request( scope={ "type": "http", @@ -151,7 +242,14 @@ def dummy_request(token: str) -> Request: @pytest.fixture def no_token_request() -> Request: - """Dummy request with no token.""" + """Dummy request with no token. + + Create a FastAPI Request that contains no Authorization header. + + Returns: + request (Request): A request object with an HTTP scope and an empty + headers list (no Authorization present). + """ return Request( scope={ "type": "http", @@ -163,7 +261,13 @@ def no_token_request() -> Request: @pytest.fixture def not_bearer_token_request() -> Request: - """Dummy request with no token.""" + """Dummy request with no token. + + Create a FastAPI Request whose Authorization header uses a non-Bearer scheme. + + Returns: + Request: A request with an Authorization header set to "NotBearer anything". + """ return Request( scope={ "type": "http", @@ -174,7 +278,18 @@ def not_bearer_token_request() -> Request: def set_auth_header(request: Request, token: str) -> None: - """Helper function to set the Authorization header in a request.""" + """Helper function to set the Authorization header in a request. + + Replace the Request's Authorization header with the given token. + + This mutates request.scope["headers"] to remove any existing Authorization + header and append a new one using the provided token value. The token + parameter should be the full header value (for example, "Bearer "). + + Parameters: + request (Request): FastAPI/Starlette Request whose headers will be modified. + token (str): Full Authorization header value to set (e.g., "Bearer "). + """ new_headers = [ (k, v) for k, v in request.scope["headers"] if k.lower() != b"authorization" ] @@ -183,7 +298,19 @@ def set_auth_header(request: Request, token: str) -> None: def ensure_test_user_id_and_name(auth_tuple: tuple, expected_token: str) -> None: - """Utility to ensure that the values in the auth tuple match the test values.""" + """Utility to ensure that the values in the auth tuple match the test values. + + Assert that an authentication tuple contains the expected test user values. + + Parameters: + auth_tuple (tuple): A 4-tuple in the form (user_id, username, skip_userid_check, token). + expected_token (str): The token value expected to be present as the fourth element. + + Raises: + AssertionError: If any element of auth_tuple does not match the expected test values + (user id equals TEST_USER_ID, username equals TEST_USER_NAME, + skip_userid_check is False, and token equals expected_token). + """ user_id, username, skip_userid_check, token = auth_tuple assert user_id == TEST_USER_ID assert username == TEST_USER_NAME @@ -212,7 +339,20 @@ def expired_token( token_header: dict[str, Any], token_payload: dict[str, Any], ) -> str: - """An well-signed yet expired token.""" + """An well-signed yet expired token. + + Create a JWT that is correctly signed but has an expiration time set in the past. + + Parameters: + single_key_set (list[dict]): A list of key dicts; the first element's + `private_key` is used to sign the token. + token_header (dict): JWT header values to include in the token. + token_payload (dict): JWT payload values; this function overwrites + `exp` to a past timestamp. + + Returns: + str: The signed JWT as a string with an expired `exp` claim. + """ jwt_instance = JsonWebToken(algorithms=["RS256"]) token_payload["exp"] = int(time.time()) - 3600 # Set expiration in the past return jwt_instance.encode( @@ -225,7 +365,14 @@ async def test_expired( mocked_signing_keys_server: Any, expired_token: str, ) -> None: - """Test with an expired token.""" + """Test with an expired token. + + Verifies that JwkTokenAuthDependency rejects an expired JWT. + + Asserts that calling the dependency with an expired token raises an + HTTPException with status code 401 and a message containing "Token has + expired". + """ _ = mocked_signing_keys_server dependency = JwkTokenAuthDependency(default_jwk_configuration) @@ -244,7 +391,21 @@ def invalid_token( token_header: dict[str, Any], token_payload: dict[str, Any], ) -> str: - """A token that is signed with different keys than the signing keys.""" + """A token that is signed with different keys than the signing keys. + + Create a JWT signed with a key different from the expected signing keys for + use in invalid-signature tests. + + Parameters: + another_single_key_set (list[dict[str, Any]]): A key set whose first + entry's "private_key" will be used to sign the token; should not match + the verifier's keys. + token_header (dict[str, Any]): JWT header to encode. + token_payload (dict[str, Any]): JWT claims to encode. + + Returns: + str: The serialized JWT as a compact string. + """ jwt_instance = JsonWebToken(algorithms=["RS256"]) return jwt_instance.encode( token_header, token_payload, another_single_key_set[0]["private_key"] @@ -313,7 +474,17 @@ def no_user_id_token( token_payload: dict[str, Any], token_header: dict[str, Any], ) -> str: - """Token without a user_id claim.""" + """Token without a user_id claim. + + Create a signed JWT that omits the `user_id` claim. + + The token is encoded using the provided header and the first private key in + `single_key_set`; the supplied `token_payload` is modified in-place to + remove `user_id`. + + Returns: + jwt (str): Encoded JWT as a string that does not contain the `user_id` claim. + """ jwt_instance = JsonWebToken(algorithms=["RS256"]) # Modify the token payload to include different claims del token_payload["user_id"] @@ -348,7 +519,13 @@ def no_username_token( token_payload: dict[str, Any], token_header: dict[str, Any], ) -> str: - """Token without a username claim.""" + """Token without a username claim. + + Create a JWT signed with the provided private key that omits the `username` claim. + + Returns: + A compact JWT string (signed) that does not contain the `username` claim. + """ jwt_instance = JsonWebToken(algorithms=["RS256"]) # Modify the token payload to include different claims del token_payload["username"] @@ -383,7 +560,20 @@ def custom_claims_token( token_payload: dict[str, Any], token_header: dict[str, Any], ) -> str: - """Token with custom claims.""" + """Token with custom claims. + + Create an RS256-signed JWT that uses custom claim names for the user id and username. + + Parameters: + single_key_set (list[dict[str, Any]]): List of signing key dicts; the + first entry's `private_key` is used to sign the token. + token_payload (dict[str, Any]): Base payload; `user_id` and `username` + are replaced with `id_of_the_user` and `name_of_the_user`. + token_header (dict[str, Any]): JWT header to include in the token. + + Returns: + str: The encoded JWT as a string. + """ jwt_instance = JsonWebToken(algorithms=["RS256"]) del token_payload["user_id"] @@ -402,7 +592,17 @@ def custom_claims_token( def custom_claims_configuration( default_jwk_configuration: JwkConfiguration, ) -> JwkConfiguration: - """Configuration for custom claims.""" + """Configuration for custom claims. + + Create a JwkConfiguration that maps custom JWT claim names for user ID and username. + + Parameters: + default_jwk_configuration (JwkConfiguration): Base configuration to copy and modify. + + Returns: + JwkConfiguration: A copy of the input configuration with `jwt_configuration.user_id_claim` + set to "id_of_the_user" and `jwt_configuration.username_claim` set to "name_of_the_user". + """ # Create a copy of the default configuration custom_config = default_jwk_configuration.model_copy() @@ -437,31 +637,75 @@ def token_header_256_1(multi_key_set: list[dict[str, Any]]) -> dict[str, Any]: @pytest.fixture def token_header_256_2(multi_key_set: list[dict[str, Any]]) -> dict[str, Any]: - """A sample token header for RS256 using multi_key_set.""" + """A sample token header for RS256 using multi_key_set. + + Create a JWT header for RS256 that references the second key in a multi-key set. + + Parameters: + multi_key_set (list[dict[str, Any]]): List of JWK-like dicts where each + dict contains a `"kid"` entry; the second entry (index 1) is used. + + Returns: + dict[str, Any]: JWT header with keys `"alg": "RS256"`, `"typ": "JWT"`, + and `"kid"` taken from `multi_key_set[1]["kid"]`. + """ return {"alg": "RS256", "typ": "JWT", "kid": multi_key_set[1]["kid"]} @pytest.fixture def token_header_384(multi_key_set: list[dict[str, Any]]) -> dict[str, Any]: - """A sample token header.""" + """A sample token header. + + Builds a JWT header for RS384 using the third key's `kid` from a multi-key set. + + Parameters: + multi_key_set (list[dict[str, Any]]): A list of JWK-like dicts; must + contain at least three entries. The `kid` from the item at index 2 is + used. + + Returns: + dict[str, Any]: JWT header with keys `"alg": "RS384"`, `"typ": "JWT"`, + and `"kid"` set to the third key's `kid`. + """ return {"alg": "RS384", "typ": "JWT", "kid": multi_key_set[2]["kid"]} @pytest.fixture def token_header_256_no_kid() -> dict[str, Any]: - """RS256 no kid.""" + """RS256 no kid. + + JWT header indicating the RS256 algorithm and intentionally omitting a key ID. + + Returns: + header (dict[str, Any]): JWT header with "alg" set to "RS256" and no "kid" field. + """ return {"alg": "RS256", "typ": "JWT"} @pytest.fixture def token_header_384_no_kid() -> dict[str, Any]: - """RS384 no kid.""" + """RS384 no kid. + + Create a JWT header for the RS384 algorithm that omits the `kid` field. + + Returns: + header (dict): JWT header with `"alg": "RS384"` and `"typ": "JWT"`, without a `kid` entry. + """ return {"alg": "RS384", "typ": "JWT"} @pytest.fixture def multi_key_set() -> list[dict[str, Any]]: - """Default multi-key set for signing tokens.""" + """Default multi-key set for signing tokens. + + Create a list of three distinct RSA signing key dictionaries for multi-key tests. + + Each dictionary contains the generated key pair and identifier fields used + by tests (e.g., `private_key`, `public_key`, and `kid`). + + Returns: + key_set (list[dict]): A list of three signing key dictionaries. + """ return [make_key(), make_key(), make_key()] @@ -473,7 +717,17 @@ def valid_tokens( token_payload: dict[str, Any], token_header_384: dict[str, Any], ) -> tuple[str, str, str]: - """Generate valid tokens for each key in the multi-key set.""" + """Generate valid tokens for each key in the multi-key set. + + Generate three valid JWTs signed by the three keys in the provided + multi-key set using the given headers and payload. + + Returns: + tuple[str, str, str]: A tuple of JWT strings (token1, token2, token3) + signed with multi_key_set[0] (RS256, header token_header_256_1), + multi_key_set[1] (RS256, header token_header_256_2), and + multi_key_set[2] (RS384, header token_header_384), respectively. + """ key_for_256_1 = multi_key_set[0] key_for_256_2 = multi_key_set[1] key_for_384 = multi_key_set[2] @@ -503,7 +757,15 @@ def valid_tokens_no_kid( token_payload: dict[str, Any], token_header_384_no_kid: dict[str, Any], ) -> tuple[str, str, str]: - """Generate valid tokens for each key in the multi-key set without a kid.""" + """Generate valid tokens for each key in the multi-key set without a kid. + + Generate three valid JWTs signed by the three keys in multi_key_set, with + headers that omit the `kid`. + + Returns: + tuple[str, str, str]: Tuple of JWT strings in order (RS256 signed with + first key, RS256 signed with second key, RS384 signed with third key). + """ key_for_256_1 = multi_key_set[0] key_for_256_2 = multi_key_set[1] key_for_384 = multi_key_set[2] @@ -530,7 +792,23 @@ def valid_tokens_no_kid( def multi_key_signing_server( mocker: MockerFixture, multi_key_set: list[dict[str, Any]] ) -> Any: - """Multi-key signing server.""" + """Multi-key signing server. + + Builds a mocked JWKS HTTP server that serves a multi-key key set. + + Creates and returns a mock aiohttp signing-keys server wired to the + provided `multi_key_set` and configured to advertise algorithms ["RS256", + "RS256", "RS384"]. + + Parameters: + mocker: pytest-mock MockerFixture used to patch aiohttp client behavior. + multi_key_set (list[dict[str, Any]]): List of JWK dictionaries to be + served by the mock JWKS endpoint. + + Returns: + A mock object that simulates an aiohttp client/session which, when + queried, yields a response containing the configured JWKs. + """ return make_signing_server(mocker, multi_key_set, ["RS256", "RS256", "RS384"]) diff --git a/tests/unit/authentication/test_k8s.py b/tests/unit/authentication/test_k8s.py index 72b314df8..6abcad5a2 100644 --- a/tests/unit/authentication/test_k8s.py +++ b/tests/unit/authentication/test_k8s.py @@ -35,7 +35,23 @@ def __init__( uid: Optional[str] = None, groups: Optional[list[str]] = None, ) -> None: - """Init function.""" + """Init function. + + Initialize a mock Kubernetes response status representing + authentication and authorization results. + + Parameters: + authenticated (Optional[bool]): Whether the token was + authenticated; when True, `user` is populated. + allowed (Optional[bool]): Whether the action is authorized (subject + access review result). + username (Optional[str]): Username to set on the created + `MockK8sUser` when `authenticated` is True. + uid (Optional[str]): User UID to set on the created `MockK8sUser` + when `authenticated` is True. + groups (Optional[list[str]]): Group list to set on the created + `MockK8sUser` when `authenticated` is True. + """ self.authenticated = authenticated self.allowed = allowed self.user: Optional[MockK8sUser] @@ -57,7 +73,16 @@ def __init__( uid: Optional[str] = None, groups: Optional[list[str]] = None, ) -> None: - """Init function.""" + """Init function. + + Create a mock Kubernetes user holding identity attributes. + + Parameters: + username (Optional[str]): The user's username, or None if not provided. + uid (Optional[str]): The user's unique identifier, or None if not provided. + groups (Optional[list[str]]): List of groups the user belongs + to, or None if not provided. + """ self.username = username self.uid = uid self.groups = groups @@ -77,7 +102,17 @@ def __init__( uid: Optional[str] = None, groups: Optional[list[str]] = None, ) -> None: - """Init function.""" + """Init function. + + Initialize a mock Kubernetes API response wrapper containing a status object. + + Parameters: + authenticated (Optional[bool]): Whether the token was authenticated; use None to omit. + allowed (Optional[bool]): Whether the action is authorized; use None to omit. + username (Optional[str]): Username of the authenticated user, if any. + uid (Optional[str]): User ID of the authenticated user, if any. + groups (Optional[list[str]]): Groups the authenticated user belongs to, if any. + """ self.status = MockK8sResponseStatus( authenticated, allowed, username, uid, groups ) diff --git a/tests/unit/authentication/test_noop_with_token.py b/tests/unit/authentication/test_noop_with_token.py index 1fc17ec71..9d08a87a3 100644 --- a/tests/unit/authentication/test_noop_with_token.py +++ b/tests/unit/authentication/test_noop_with_token.py @@ -89,7 +89,15 @@ async def test_noop_with_token_auth_dependency_no_token() -> None: async def test_noop_with_token_auth_dependency_no_bearer() -> None: - """Test the NoopWithTokenAuthDependency class with no token.""" + """Test the NoopWithTokenAuthDependency class with no token. + + Verify that NoopWithTokenAuthDependency raises an HTTPException when the + Authorization header does not contain a Bearer token. + + Asserts the exception has status code 401 and that the detail contains: + - response: "Missing or invalid credentials provided by client" + - cause: "No token found in Authorization header" + """ dependency = NoopWithTokenAuthDependency() # Create a mock request without token diff --git a/tests/unit/authentication/test_rh_identity.py b/tests/unit/authentication/test_rh_identity.py index ef0fe2841..247119d7f 100644 --- a/tests/unit/authentication/test_rh_identity.py +++ b/tests/unit/authentication/test_rh_identity.py @@ -16,7 +16,18 @@ @pytest.fixture def user_identity_data() -> dict: - """Fixture providing valid User identity data.""" + """Fixture providing valid User identity data. + + Provide a valid Red Hat identity payload for a User, suitable for unit tests. + + Returns: + identity_data (dict): A dictionary with two top-level keys: + - "identity": contains "account_number", "org_id", "type" (set to + "User"), and "user" (with "user_id", "username", "is_org_admin"). + - "entitlements": maps service names (e.g., "rhel", "ansible", + "openshift") to entitlement objects with "is_entitled" and + "is_trial". + """ return { "identity": { "account_number": "123", @@ -38,7 +49,21 @@ def user_identity_data() -> dict: @pytest.fixture def system_identity_data() -> dict: - """Fixture providing valid System identity data.""" + """Fixture providing valid System identity data. + + Provide a sample System identity payload used by tests. + + Returns: + dict: A System identity dictionary with the following shape: + - identity: { + "account_number": str, + "org_id": str, + "type": "System", + "system": {"cn": str} + } + - entitlements: mapping of product names to entitlement objects, e.g. + {"rhel": {"is_entitled": bool, "is_trial": bool}} + """ return { "identity": { "account_number": "123", @@ -53,13 +78,35 @@ def system_identity_data() -> dict: def create_auth_header(identity_data: dict) -> str: - """Helper to create base64-encoded x-rh-identity header value.""" + """Helper to create base64-encoded x-rh-identity header value. + + Create a base64-encoded string suitable for use as an x-rh-identity header from identity data. + + Parameters: + identity_data (dict): Identity payload (serializable to JSON) + containing identity, user/system fields, and optional entitlements. + + Returns: + header_value (str): Base64-encoded JSON string representing the provided identity data. + """ json_str = json.dumps(identity_data) return base64.b64encode(json_str.encode("utf-8")).decode("utf-8") def create_request_with_header(header_value: Optional[str]) -> Request: - """Helper to create mock Request with x-rh-identity header.""" + """Helper to create mock Request with x-rh-identity header. + + Create a mock FastAPI Request with an `x-rh-identity` header for tests. + + Parameters: + header_value (Optional[str]): Base64-encoded identity header value to + set. If `None` or empty, the returned request will have no headers. + + Returns: + Request: A mocked Request object whose `headers` contains + `{"x-rh-identity": header_value}` when a value is provided, or an empty + dict otherwise. + """ request = Mock(spec=Request) request.headers = {"x-rh-identity": header_value} if header_value else {} return request @@ -139,7 +186,16 @@ def test_validate_entitlements( should_raise: bool, expected_error: Optional[str], ) -> None: - """Test validate_entitlements with various requirement configurations.""" + """Test validate_entitlements with various requirement configurations. + + Verify that RHIdentityData.validate_entitlements enforces required entitlements. + + Creates an RHIdentityData instance from the provided identity + dictionary and required_entitlements; asserts that calling + validate_entitlements raises an HTTPException with status code 403 + containing expected_error when should_raise is True, and does not raise + when should_raise is False. + """ rh_identity = RHIdentityData(user_identity_data, required_entitlements) if should_raise: