diff --git a/docs/cli/README.md b/docs/cli/README.md index b9fdd45..61b2512 100644 --- a/docs/cli/README.md +++ b/docs/cli/README.md @@ -454,8 +454,14 @@ KSEF_CLI_TOKEN_STORE_KEY= KSEF_CLI_ALLOW_INSECURE_TOKEN_STORE=1 ``` +- Gdy CLI faktycznie uzyje plaintext fallback, wypisuje jawne ostrzezenie o niezabezpieczonym zapisie tokenow. - Na Windows plaintext fallback jest zablokowany nawet po ustawieniu tej zmiennej; uzyj keyringa albo fallbacku szyfrowanego. - Gdy keyring jest obecny, ale backend zwraca blad, CLI automatycznie przechodzi na dostepny fallback (`KSEF_CLI_TOKEN_STORE_KEY` lub awaryjny plaintext poza Windows). +- `ksef health check` oraz diagnostyka preflight pokazuja aktualny tryb polityki token-store jako jedno z: + - `keyring` + - `encrypted-fallback` + - `plaintext-fallback` + - `unavailable` Lokalizacja token fallback: - Windows: `%LOCALAPPDATA%/ksef-cli/tokens.json` diff --git a/src/ksef_client/cli/auth/keyring_store.py b/src/ksef_client/cli/auth/keyring_store.py index 4b1020e..d51cff9 100644 --- a/src/ksef_client/cli/auth/keyring_store.py +++ b/src/ksef_client/cli/auth/keyring_store.py @@ -7,6 +7,7 @@ import os import time import uuid +import warnings from collections.abc import Iterator from contextlib import contextmanager, suppress from pathlib import Path @@ -54,6 +55,10 @@ class _KeyringError(Exception): _ALLOW_INSECURE_FALLBACK_ENV = "KSEF_CLI_ALLOW_INSECURE_TOKEN_STORE" _TOKEN_STORE_KEY_ENV = "KSEF_CLI_TOKEN_STORE_KEY" _ENCRYPTION_MODE = "fernet-v1" +_PLAINTEXT_FALLBACK_WARNING = ( + "Using plaintext token fallback storage. Tokens are stored unencrypted in tokens.json." +) +_PLAINTEXT_WARNING_EMITTED = False class _KeyringBackend(Protocol): @@ -106,6 +111,24 @@ def _fallback_mode() -> str | None: return None +def get_token_store_mode() -> str: + if _KEYRING_AVAILABLE and _keyring is not None: + return "keyring" + if _encrypted_fallback_enabled(): + return "encrypted-fallback" + if _plaintext_fallback_allowed(): + return "plaintext-fallback" + return "unavailable" + + +def _warn_plaintext_fallback_used(mode: str | None) -> None: + global _PLAINTEXT_WARNING_EMITTED + if mode != "insecure-plaintext" or _PLAINTEXT_WARNING_EMITTED: + return + warnings.warn(_PLAINTEXT_FALLBACK_WARNING, UserWarning, stacklevel=3) + _PLAINTEXT_WARNING_EMITTED = True + + def _fallback_cipher() -> Any: key = _token_store_key() if key is None or _FernetClass is None: @@ -271,6 +294,7 @@ def save_tokens(profile: str, access_token: str, refresh_token: str) -> None: mode = _fallback_mode() if mode is None: _raise_no_secure_store() + _warn_plaintext_fallback_used(mode) encoded_tokens = _encode_tokens( access_token=access_token, refresh_token=refresh_token, mode=mode ) @@ -315,7 +339,9 @@ def get_tokens(profile: str) -> tuple[str, str] | None: if _fallback_mode() is None: return None - if _fallback_mode() is not None: + mode = _fallback_mode() + if mode is not None: + _warn_plaintext_fallback_used(mode) payload = _load_fallback_tokens() profile_data = payload.get(profile) if not isinstance(profile_data, dict): diff --git a/src/ksef_client/cli/commands/health_cmd.py b/src/ksef_client/cli/commands/health_cmd.py index 263a3f2..660dc24 100644 --- a/src/ksef_client/cli/commands/health_cmd.py +++ b/src/ksef_client/cli/commands/health_cmd.py @@ -6,6 +6,7 @@ from ksef_client.exceptions import KsefApiError, KsefHttpError, KsefRateLimitError +from ..auth.keyring_store import get_token_store_mode from ..auth.manager import resolve_base_url from ..context import profile_label, require_context, require_profile from ..errors import CliError @@ -16,6 +17,27 @@ app = typer.Typer(help="Run connectivity and diagnostics checks.") +def _append_token_store_check(result: dict[str, object]) -> None: + checks_obj = result.get("checks") + if not isinstance(checks_obj, list): + checks_obj = [] + result["checks"] = checks_obj + + checks = checks_obj + for item in checks: + if isinstance(item, dict) and item.get("name") == "token_store": + return + + mode = get_token_store_mode() + checks.append( + { + "name": "token_store", + "status": "WARN" if mode in {"plaintext-fallback", "unavailable"} else "PASS", + "message": mode, + } + ) + + def _render_error(ctx: typer.Context, command: str, exc: Exception) -> None: cli_ctx = require_context(ctx) renderer = get_renderer(cli_ctx) @@ -89,6 +111,7 @@ def health_check( check_auth=check_auth, check_certs=check_certs, ) + _append_token_store_check(result) except Exception as exc: _render_error(ctx, "health.check", exc) renderer.success(command="health.check", profile=profile, data=result) diff --git a/src/ksef_client/cli/diagnostics/checks.py b/src/ksef_client/cli/diagnostics/checks.py index 27411b5..b30a078 100644 --- a/src/ksef_client/cli/diagnostics/checks.py +++ b/src/ksef_client/cli/diagnostics/checks.py @@ -2,7 +2,7 @@ from typing import Any -from ..auth.keyring_store import get_tokens +from ..auth.keyring_store import get_token_store_mode, get_tokens from ..config.loader import load_config @@ -19,6 +19,7 @@ def run_preflight(profile: str | None = None) -> dict[str, Any]: context_type = selected_cfg.context_type if selected_cfg else "" context_value = selected_cfg.context_value if selected_cfg else "" has_tokens = bool(get_tokens(selected_profile)) if selected_profile else False + token_store_mode = get_token_store_mode() if selected_profile is None: profile_status = "WARN" @@ -75,6 +76,11 @@ def run_preflight(profile: str | None = None) -> dict[str, Any]: ) ), }, + { + "name": "token_store", + "status": "PASS", + "message": token_store_mode, + }, ] overall = "PASS" if all(item["status"] == "PASS" for item in checks) else "WARN" diff --git a/tests/cli/integration/test_health_check.py b/tests/cli/integration/test_health_check.py index a717b6a..da7dfb9 100644 --- a/tests/cli/integration/test_health_check.py +++ b/tests/cli/integration/test_health_check.py @@ -46,3 +46,54 @@ def test_health_check_error_exit(runner, monkeypatch) -> None: ) result = runner.invoke(app, ["health", "check", "--check-auth"]) assert result.exit_code == int(ExitCode.AUTH_ERROR) + + +def test_health_check_json_includes_token_store_mode(runner, monkeypatch) -> None: + monkeypatch.setattr( + health_cmd, + "run_health_check", + lambda **kwargs: {"overall": "PASS", "checks": [{"name": "base_url", "status": "PASS"}]}, + ) + monkeypatch.setattr(health_cmd, "get_token_store_mode", lambda: "plaintext-fallback") + + result = runner.invoke(app, ["--json", "health", "check"]) + assert result.exit_code == 0 + payload = _json_output(result.stdout) + token_store_checks = [c for c in payload["data"]["checks"] if c["name"] == "token_store"] + assert token_store_checks == [ + {"name": "token_store", "status": "WARN", "message": "plaintext-fallback"} + ] + + +def test_health_check_does_not_duplicate_existing_token_store_check(runner, monkeypatch) -> None: + monkeypatch.setattr( + health_cmd, + "run_health_check", + lambda **kwargs: { + "overall": "PASS", + "checks": [{"name": "token_store", "status": "PASS", "message": "keyring"}], + }, + ) + monkeypatch.setattr(health_cmd, "get_token_store_mode", lambda: "plaintext-fallback") + + result = runner.invoke(app, ["--json", "health", "check"]) + assert result.exit_code == 0 + payload = _json_output(result.stdout) + token_store_checks = [c for c in payload["data"]["checks"] if c["name"] == "token_store"] + assert token_store_checks == [{"name": "token_store", "status": "PASS", "message": "keyring"}] + + +def test_health_check_replaces_non_list_checks_with_token_store_check(runner, monkeypatch) -> None: + monkeypatch.setattr( + health_cmd, + "run_health_check", + lambda **kwargs: {"overall": "PASS", "checks": {"name": "base_url", "status": "PASS"}}, + ) + monkeypatch.setattr(health_cmd, "get_token_store_mode", lambda: "keyring") + + result = runner.invoke(app, ["--json", "health", "check"]) + assert result.exit_code == 0 + payload = _json_output(result.stdout) + assert payload["data"]["checks"] == [ + {"name": "token_store", "status": "PASS", "message": "keyring"} + ] diff --git a/tests/cli/unit/test_keyring_store.py b/tests/cli/unit/test_keyring_store.py index d126c9e..94df357 100644 --- a/tests/cli/unit/test_keyring_store.py +++ b/tests/cli/unit/test_keyring_store.py @@ -6,6 +6,7 @@ import threading import time import types +import warnings import pytest @@ -471,3 +472,57 @@ def delete_password(self, service: str, key: str) -> None: assert reloaded._KEYRING_AVAILABLE is True finally: importlib.reload(original_module) + + +@pytest.mark.parametrize( + ("keyring_available", "keyring_backend", "store_key", "allow_insecure", "os_name", "expected"), + [ + (True, object(), "", "", "posix", "keyring"), + (False, None, "my-secret-passphrase", "", "posix", "encrypted-fallback"), + (False, None, "", "1", "posix", "plaintext-fallback"), + (False, None, "", "1", "nt", "unavailable"), + (False, None, "", "", "posix", "unavailable"), + ], +) +def test_keyring_store_mode_detection( + monkeypatch, + keyring_available: bool, + keyring_backend: object | None, + store_key: str, + allow_insecure: str, + os_name: str, + expected: str, +) -> None: + monkeypatch.setattr(keyring_store, "_KEYRING_AVAILABLE", keyring_available) + monkeypatch.setattr(keyring_store, "_keyring", keyring_backend) + monkeypatch.setattr(keyring_store.os, "name", os_name, raising=False) + + if store_key: + monkeypatch.setenv(keyring_store._TOKEN_STORE_KEY_ENV, store_key) + else: + monkeypatch.delenv(keyring_store._TOKEN_STORE_KEY_ENV, raising=False) + + if allow_insecure: + monkeypatch.setenv(keyring_store._ALLOW_INSECURE_FALLBACK_ENV, allow_insecure) + else: + monkeypatch.delenv(keyring_store._ALLOW_INSECURE_FALLBACK_ENV, raising=False) + + assert keyring_store.get_token_store_mode() == expected + + +def test_keyring_store_plaintext_fallback_warns_once_on_use(monkeypatch, tmp_path) -> None: + monkeypatch.setattr(keyring_store, "_KEYRING_AVAILABLE", False) + monkeypatch.setattr(keyring_store, "_keyring", None) + monkeypatch.setattr(keyring_store, "cache_dir", lambda: tmp_path) + monkeypatch.setattr(keyring_store.os, "name", "posix", raising=False) + monkeypatch.setenv(keyring_store._ALLOW_INSECURE_FALLBACK_ENV, "1") + monkeypatch.delenv(keyring_store._TOKEN_STORE_KEY_ENV, raising=False) + monkeypatch.setattr(keyring_store, "_PLAINTEXT_WARNING_EMITTED", False) + + with pytest.warns(UserWarning, match="plaintext token fallback"): + keyring_store.save_tokens("demo", "acc", "ref") + + with warnings.catch_warnings(record=True) as recorded: + warnings.simplefilter("always") + keyring_store.get_tokens("demo") + assert len(recorded) == 0