Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/linting.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Linting

on:
push:
branches:
- main
pull_request:

permissions:
contents: write

defaults:
run:
working-directory: ./

jobs:
lint-format:
runs-on: ubuntu-latest
name: Reformat Code

steps:
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.event_name == 'pull_request' && github.event.pull_request.head.sha || github.ref }}
fetch-depth: 0

- name: Install poetry
run: |
pipx install poetry

- name: Setup Python
uses: actions/setup-python@v5
with:
python-version-file: 'pyproject.toml'
cache: 'poetry'

- name: Install Dependencies
run: poetry install --no-interaction --sync --all-extras

- name: Run Formatter
run: poetry run ruff format .

- name: Commit Changes
uses: stefanzweifel/git-auto-commit-action@v5
with:
commit_message: 'style: Apply automated code formatting [skip ci]'
commit_options: '--no-verify'
repository: .
commit_user_name: github-actions[bot]
commit_user_email: github-actions[bot]@users.noreply.github.com
commit_author: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1,242 changes: 635 additions & 607 deletions poetry.lock

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ types-python-dateutil = ">= 2.8.19.14"
mypy = ">= 1.5"
testcontainers = "3.7.1"
python-dotenv = "1.0.1"
ruff = "^0.11.5"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down Expand Up @@ -89,3 +90,17 @@ disable_error_code = ["import-untyped"]

[tool.flake8]
max-line-length = 99

[tool.ruff]
line-length = 130
fix = true
target-version = "py39"
exclude = [
"zitadel_client/api/*",
"zitadel_client/models/*"
]

[tool.ruff.lint]
select = ["E", "F", "I", "B", "C", "N", "Q", "S", "T"]
extend-select = ["A"]
ignore = ["S101"] # Ignore assert statements
4 changes: 2 additions & 2 deletions spec/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

@pytest.fixture(scope="session", autouse=True)
def load_env() -> None:
"""Load the .env file for the entire test session."""
load_dotenv()
"""Load the .env file for the entire test session."""
load_dotenv()
29 changes: 12 additions & 17 deletions spec/sdk_test_using_client_credentials_authentication_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import pytest

import zitadel_client as zitadel
from zitadel_client.auth.client_credentials_authenticator import ClientCredentialsAuthenticator
from zitadel_client.auth.client_credentials_authenticator import (
ClientCredentialsAuthenticator,
)


@pytest.fixture
Expand Down Expand Up @@ -33,42 +35,35 @@ def user_id(client_id: str, client_secret: str, base_url: str) -> str | None:
response = client.users.add_human_user(
body=zitadel.models.V2AddHumanUserRequest(
username=uuid.uuid4().hex,
profile=zitadel.models.V2SetHumanProfile(given_name="John", family_name="Doe"), # type: ignore[call-arg]
email=zitadel.models.V2SetHumanEmail(email=f"johndoe{uuid.uuid4().hex}@caos.ag")
profile=zitadel.models.V2SetHumanProfile(given_name="John", family_name="Doe"), # type: ignore[call-arg]
email=zitadel.models.V2SetHumanEmail(email=f"johndoe{uuid.uuid4().hex}@caos.ag"),
)
)
print("User created:", response)
return response.user_id
except Exception as e:
pytest.fail(f"Exception while creating user: {e}")


def test_should_deactivate_and_reactivate_user_with_valid_token(user_id: str, client_id: str, client_secret: str, base_url: str) -> None:
def test_should_deactivate_and_reactivate_user_with_valid_token(
user_id: str, client_id: str, client_secret: str, base_url: str
) -> None:
"""Test to (de)activate the user with a valid token."""
with zitadel.Zitadel(ClientCredentialsAuthenticator.builder(base_url, client_id, client_secret).build()) as client:
try:
deactivate_response = client.users.deactivate_user(user_id=user_id)
print("User deactivated:", deactivate_response)
assert deactivate_response is not None, "Deactivation response is None"

reactivate_response = client.users.reactivate_user(user_id=user_id)
print("User reactivated:", reactivate_response)
# Adjust based on actual response format
# assert reactivate_response["status"] == "success"
assert reactivate_response is not None, "Reactivation response is None"
except Exception as e:
pytest.fail(f"Exception when calling deactivate_user or reactivate_user with valid token: {e}")


def test_should_not_deactivate_or_reactivate_user_with_invalid_token(user_id: str, base_url: str) -> None:
"""Test to attempt (de)activating the user with an invalid token."""
with zitadel.Zitadel(ClientCredentialsAuthenticator.builder(base_url, "id", "secret").build()) as client:
try:
with pytest.raises(Exception, match="Failed to refresh token: invalid_client: client not found"):
client.users.deactivate_user(user_id=user_id)
pytest.fail("Expected exception when deactivating user with invalid token, but got response.")
except Exception as e:
print("Caught expected UnauthorizedException:", e)

try:
with pytest.raises(Exception, match="Failed to refresh token: invalid_client: client not found"):
client.users.reactivate_user(user_id=user_id)
pytest.fail("Expected exception when reactivating user with invalid token, but got response.")
except Exception as e:
print("Caught expected UnauthorizedException:", e)
29 changes: 9 additions & 20 deletions spec/sdk_test_using_personal_access_token_authentication_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import pytest

import zitadel_client as zitadel
from zitadel_client.auth.personal_access_token_authenticator import PersonalAccessTokenAuthenticator
from zitadel_client.auth.personal_access_token_authenticator import (
PersonalAccessTokenAuthenticator,
)
from zitadel_client.exceptions import UnauthorizedException


Expand Down Expand Up @@ -34,11 +36,10 @@ def user_id(valid_token: str, base_url: str) -> str | None:
response = client.users.add_human_user(
body=zitadel.models.V2AddHumanUserRequest(
username=uuid.uuid4().hex,
profile=zitadel.models.V2SetHumanProfile(given_name="John", family_name="Doe"), # type: ignore[call-arg]
email=zitadel.models.V2SetHumanEmail(email=f"johndoe{uuid.uuid4().hex}@caos.ag")
profile=zitadel.models.V2SetHumanProfile(given_name="John", family_name="Doe"), # type: ignore[call-arg]
email=zitadel.models.V2SetHumanEmail(email=f"johndoe{uuid.uuid4().hex}@caos.ag"),
)
)
print("User created:", response)
return response.user_id
except Exception as e:
pytest.fail(f"Exception while creating user: {e}")
Expand All @@ -49,31 +50,19 @@ def test_should_deactivate_and_reactivate_user_with_valid_token(user_id: str, va
with zitadel.Zitadel(PersonalAccessTokenAuthenticator(base_url, valid_token)) as client:
try:
deactivate_response = client.users.deactivate_user(user_id=user_id)
print("User deactivated:", deactivate_response)
assert deactivate_response is not None, "Deactivation response is None"

reactivate_response = client.users.reactivate_user(user_id=user_id)
print("User reactivated:", reactivate_response)
# Adjust based on actual response format
# assert reactivate_response["status"] == "success"
assert reactivate_response is not None, "Reactivation response is None"
except Exception as e:
pytest.fail(f"Exception when calling deactivate_user or reactivate_user with valid token: {e}")


def test_should_not_deactivate_or_reactivate_user_with_invalid_token(user_id: str, invalid_token: str, base_url: str) -> None:
"""Test to attempt (de)activating the user with an invalid token."""
with zitadel.Zitadel(PersonalAccessTokenAuthenticator(base_url, invalid_token)) as client:
try:
with pytest.raises(UnauthorizedException):
client.users.deactivate_user(user_id=user_id)
pytest.fail("Expected exception when deactivating user with invalid token, but got response.")
except UnauthorizedException as e:
print("Caught expected UnauthorizedException:", e)
except Exception as e:
pytest.fail(f"Invalid exception when calling the function: {e}")

try:
with pytest.raises(UnauthorizedException):
client.users.reactivate_user(user_id=user_id)
pytest.fail("Expected exception when reactivating user with invalid token, but got response.")
except UnauthorizedException as e:
print("Caught expected UnauthorizedException:", e)
except Exception as e:
pytest.fail(f"Invalid exception when calling the function: {e}")
11 changes: 4 additions & 7 deletions spec/sdk_test_using_web_token_authentication_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ def user_id(key_file: str, base_url: str) -> str | None:
response = client.users.add_human_user(
body=zitadel.models.V2AddHumanUserRequest(
username=uuid.uuid4().hex,
profile=zitadel.models.V2SetHumanProfile(given_name="John", family_name="Doe"), # type: ignore[call-arg]
email=zitadel.models.V2SetHumanEmail(email=f"johndoe{uuid.uuid4().hex}@caos.ag")
profile=zitadel.models.V2SetHumanProfile(given_name="John", family_name="Doe"), # type: ignore[call-arg]
email=zitadel.models.V2SetHumanEmail(email=f"johndoe{uuid.uuid4().hex}@caos.ag"),
)
)
print("User created:", response)
return response.user_id
except Exception as e:
pytest.fail(f"Exception while creating user: {e}")
Expand All @@ -47,11 +46,9 @@ def test_should_deactivate_and_reactivate_user_with_valid_token(user_id: str, ke
with zitadel.Zitadel(WebTokenAuthenticator.from_json(base_url, key_file)) as client:
try:
deactivate_response = client.users.deactivate_user(user_id=user_id)
print("User deactivated:", deactivate_response)
assert deactivate_response is not None, "Deactivation response is None"

reactivate_response = client.users.reactivate_user(user_id=user_id)
print("User reactivated:", reactivate_response)
# Adjust based on actual response format
# assert reactivate_response["status"] == "success"
assert reactivate_response is not None, "Reactivation response is None"
except Exception as e:
pytest.fail(f"Exception when calling deactivate_user or reactivate_user with valid token: {e}")
53 changes: 33 additions & 20 deletions test/auth/test_client_credentials_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,42 @@
from datetime import datetime, timezone

from test.auth.test_oauth_authenticator import OAuthAuthenticatorTest
from zitadel_client.auth.client_credentials_authenticator import ClientCredentialsAuthenticator
from zitadel_client.auth.client_credentials_authenticator import (
ClientCredentialsAuthenticator,
)


class ClientCredentialsAuthenticatorTest(OAuthAuthenticatorTest):
"""
Test for ClientCredentialsAuthenticator to verify token refresh functionality.
Extends the base OAuthAuthenticatorTest class.
"""
"""
Test for ClientCredentialsAuthenticator to verify token refresh functionality.
Extends the base OAuthAuthenticatorTest class.
"""

def test_refresh_token(self) -> None:
time.sleep(20)
def test_refresh_token(self) -> None:
time.sleep(20)

assert self.oauth_host is not None
authenticator = ClientCredentialsAuthenticator.builder(self.oauth_host, "dummy-client", "dummy-secret") \
.scopes("openid", "foo") \
.build()
assert self.oauth_host is not None
authenticator = (
ClientCredentialsAuthenticator.builder(self.oauth_host, "dummy-client", "dummy-secret")
.scopes("openid", "foo")
.build()
)

self.assertTrue(authenticator.get_auth_token(), "Access token should not be empty")
token = authenticator.refresh_token()
self.assertEqual({"Authorization": "Bearer " + token.access_token}, authenticator.get_auth_headers())
self.assertTrue(token.access_token, "Access token should not be null")
self.assertTrue(token.expires_at > datetime.now(timezone.utc), "Token expiry should be in the future")
self.assertEqual(token.access_token, authenticator.get_auth_token())
self.assertEqual(self.oauth_host, authenticator.get_host())
self.assertNotEqual(authenticator.refresh_token().access_token, authenticator.refresh_token().access_token,
"Two refreshToken calls should produce different tokens")
self.assertTrue(authenticator.get_auth_token(), "Access token should not be empty")
token = authenticator.refresh_token()
self.assertEqual(
{"Authorization": "Bearer " + token.access_token},
authenticator.get_auth_headers(),
)
self.assertTrue(token.access_token, "Access token should not be null")
self.assertTrue(
token.expires_at > datetime.now(timezone.utc),
"Token expiry should be in the future",
)
self.assertEqual(token.access_token, authenticator.get_auth_token())
self.assertEqual(self.oauth_host, authenticator.get_host())
self.assertNotEqual(
authenticator.refresh_token().access_token,
authenticator.refresh_token().access_token,
"Two refreshToken calls should produce different tokens",
)
18 changes: 9 additions & 9 deletions test/auth/test_no_auth_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@


class NoAuthAuthenticatorTest(unittest.TestCase):
def test_returns_empty_headers_and_default_host(self) -> None:
auth = NoAuthAuthenticator()
self.assertEqual({}, auth.get_auth_headers())
self.assertEqual("http://localhost", auth.get_host())

def test_returns_empty_headers_and_custom_host(self) -> None:
auth = NoAuthAuthenticator("https://custom-host")
self.assertEqual({}, auth.get_auth_headers())
self.assertEqual("https://custom-host", auth.get_host())
def test_returns_empty_headers_and_default_host(self) -> None:
auth = NoAuthAuthenticator()
self.assertEqual({}, auth.get_auth_headers())
self.assertEqual("http://localhost", auth.get_host())

def test_returns_empty_headers_and_custom_host(self) -> None:
auth = NoAuthAuthenticator("https://custom-host")
self.assertEqual({}, auth.get_auth_headers())
self.assertEqual("https://custom-host", auth.get_host())
52 changes: 26 additions & 26 deletions test/auth/test_oauth_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,29 @@


class OAuthAuthenticatorTest(unittest.TestCase):
"""
Base test class for OAuth authenticators.

This class starts a Docker container running the mock OAuth2 server
(ghcr.io/navikt/mock-oauth2-server:2.1.10) before any tests run and stops it after all tests.
It sets the class variable `oauth_host` to the container’s accessible URL.

The container is configured to wait for an HTTP response from the "/" endpoint
with a status code of 405, using HttpWaitStrategy.
"""
oauth_host: str | None = None
mock_oauth2_server: DockerContainer = None

@classmethod
def setUpClass(cls) -> None:
cls.mock_oauth2_server = DockerContainer("ghcr.io/navikt/mock-oauth2-server:2.1.10") \
.with_exposed_ports(8080)
cls.mock_oauth2_server.start()
host = cls.mock_oauth2_server.get_container_host_ip()
port = cls.mock_oauth2_server.get_exposed_port(8080)
cls.oauth_host = f"http://{host}:{port}"

@classmethod
def tearDownClass(cls) -> None:
if cls.mock_oauth2_server is not None:
cls.mock_oauth2_server.stop()
"""
Base test class for OAuth authenticators.

This class starts a Docker container running the mock OAuth2 server
(ghcr.io/navikt/mock-oauth2-server:2.1.10) before any tests run and stops it after all tests.
It sets the class variable `oauth_host` to the container’s accessible URL.

The container is configured to wait for an HTTP response from the "/" endpoint
with a status code of 405, using HttpWaitStrategy.
"""

oauth_host: str | None = None
mock_oauth2_server: DockerContainer = None

@classmethod
def setUpClass(cls) -> None:
cls.mock_oauth2_server = DockerContainer("ghcr.io/navikt/mock-oauth2-server:2.1.10").with_exposed_ports(8080)
cls.mock_oauth2_server.start()
host = cls.mock_oauth2_server.get_container_host_ip()
port = cls.mock_oauth2_server.get_exposed_port(8080)
cls.oauth_host = f"http://{host}:{port}"

@classmethod
def tearDownClass(cls) -> None:
if cls.mock_oauth2_server is not None:
cls.mock_oauth2_server.stop()
12 changes: 7 additions & 5 deletions test/auth/test_personal_access_authenticator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import unittest

from zitadel_client.auth.personal_access_token_authenticator import PersonalAccessTokenAuthenticator
from zitadel_client.auth.personal_access_token_authenticator import (
PersonalAccessTokenAuthenticator,
)


class PersonalAccessTokenAuthenticatorTest(unittest.TestCase):
def test_returns_expected_headers_and_host(self) -> None:
auth = PersonalAccessTokenAuthenticator("https://api.example.com", "my-secret-token")
self.assertEqual({"Authorization": "Bearer my-secret-token"}, auth.get_auth_headers())
self.assertEqual("https://api.example.com", auth.get_host())
def test_returns_expected_headers_and_host(self) -> None:
auth = PersonalAccessTokenAuthenticator("https://api.example.com", "my-secret-token")
self.assertEqual({"Authorization": "Bearer my-secret-token"}, auth.get_auth_headers())
self.assertEqual("https://api.example.com", auth.get_host())
Loading