Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,14 @@ KSEF_CLI_TOKEN_STORE_KEY=<TWOJ_KLUCZ>
KSEF_CLI_ALLOW_INSECURE_TOKEN_STORE=1
```

- Gdy CLI faktycznie uzyje plaintext fallback, wypisuje jawne ostrzezenie o niezabezpieczonym zapisie tokenow.
- Na Windows plaintext fallback jest zablokowany nawet po ustawieniu tej zmiennej; uzyj keyringa albo fallbacku szyfrowanego.
- Gdy keyring jest obecny, ale backend zwraca blad, CLI automatycznie przechodzi na dostepny fallback (`KSEF_CLI_TOKEN_STORE_KEY` lub awaryjny plaintext poza Windows).
- `ksef health check` oraz diagnostyka preflight pokazuja aktualny tryb polityki token-store jako jedno z:
- `keyring`
- `encrypted-fallback`
- `plaintext-fallback`
- `unavailable`

Lokalizacja token fallback:
- Windows: `%LOCALAPPDATA%/ksef-cli/tokens.json`
Expand Down
28 changes: 27 additions & 1 deletion src/ksef_client/cli/auth/keyring_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import time
import uuid
import warnings
from collections.abc import Iterator
from contextlib import contextmanager, suppress
from pathlib import Path
Expand Down Expand Up @@ -54,6 +55,10 @@ class _KeyringError(Exception):
_ALLOW_INSECURE_FALLBACK_ENV = "KSEF_CLI_ALLOW_INSECURE_TOKEN_STORE"
_TOKEN_STORE_KEY_ENV = "KSEF_CLI_TOKEN_STORE_KEY"
_ENCRYPTION_MODE = "fernet-v1"
_PLAINTEXT_FALLBACK_WARNING = (
"Using plaintext token fallback storage. Tokens are stored unencrypted in tokens.json."
)
_PLAINTEXT_WARNING_EMITTED = False


class _KeyringBackend(Protocol):
Expand Down Expand Up @@ -106,6 +111,24 @@ def _fallback_mode() -> str | None:
return None


def get_token_store_mode() -> str:
if _KEYRING_AVAILABLE and _keyring is not None:
return "keyring"
if _encrypted_fallback_enabled():
return "encrypted-fallback"
if _plaintext_fallback_allowed():
return "plaintext-fallback"
return "unavailable"


def _warn_plaintext_fallback_used(mode: str | None) -> None:
global _PLAINTEXT_WARNING_EMITTED
if mode != "insecure-plaintext" or _PLAINTEXT_WARNING_EMITTED:
return
warnings.warn(_PLAINTEXT_FALLBACK_WARNING, UserWarning, stacklevel=3)
_PLAINTEXT_WARNING_EMITTED = True


def _fallback_cipher() -> Any:
key = _token_store_key()
if key is None or _FernetClass is None:
Expand Down Expand Up @@ -271,6 +294,7 @@ def save_tokens(profile: str, access_token: str, refresh_token: str) -> None:
mode = _fallback_mode()
if mode is None:
_raise_no_secure_store()
_warn_plaintext_fallback_used(mode)
encoded_tokens = _encode_tokens(
access_token=access_token, refresh_token=refresh_token, mode=mode
)
Expand Down Expand Up @@ -315,7 +339,9 @@ def get_tokens(profile: str) -> tuple[str, str] | None:
if _fallback_mode() is None:
return None

if _fallback_mode() is not None:
mode = _fallback_mode()
if mode is not None:
_warn_plaintext_fallback_used(mode)
payload = _load_fallback_tokens()
profile_data = payload.get(profile)
if not isinstance(profile_data, dict):
Expand Down
23 changes: 23 additions & 0 deletions src/ksef_client/cli/commands/health_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from ksef_client.exceptions import KsefApiError, KsefHttpError, KsefRateLimitError

from ..auth.keyring_store import get_token_store_mode
from ..auth.manager import resolve_base_url
from ..context import profile_label, require_context, require_profile
from ..errors import CliError
Expand All @@ -16,6 +17,27 @@
app = typer.Typer(help="Run connectivity and diagnostics checks.")


def _append_token_store_check(result: dict[str, object]) -> None:
checks_obj = result.get("checks")
if not isinstance(checks_obj, list):
checks_obj = []
result["checks"] = checks_obj

checks = checks_obj
for item in checks:
if isinstance(item, dict) and item.get("name") == "token_store":
return

mode = get_token_store_mode()
checks.append(
{
"name": "token_store",
"status": "WARN" if mode in {"plaintext-fallback", "unavailable"} else "PASS",
"message": mode,
}
)


def _render_error(ctx: typer.Context, command: str, exc: Exception) -> None:
cli_ctx = require_context(ctx)
renderer = get_renderer(cli_ctx)
Expand Down Expand Up @@ -89,6 +111,7 @@ def health_check(
check_auth=check_auth,
check_certs=check_certs,
)
_append_token_store_check(result)
except Exception as exc:
_render_error(ctx, "health.check", exc)
renderer.success(command="health.check", profile=profile, data=result)
8 changes: 7 additions & 1 deletion src/ksef_client/cli/diagnostics/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import Any

from ..auth.keyring_store import get_tokens
from ..auth.keyring_store import get_token_store_mode, get_tokens
from ..config.loader import load_config


Expand All @@ -19,6 +19,7 @@ def run_preflight(profile: str | None = None) -> dict[str, Any]:
context_type = selected_cfg.context_type if selected_cfg else ""
context_value = selected_cfg.context_value if selected_cfg else ""
has_tokens = bool(get_tokens(selected_profile)) if selected_profile else False
token_store_mode = get_token_store_mode()

if selected_profile is None:
profile_status = "WARN"
Expand Down Expand Up @@ -75,6 +76,11 @@ def run_preflight(profile: str | None = None) -> dict[str, Any]:
)
),
},
{
"name": "token_store",
"status": "PASS",
"message": token_store_mode,
},
]

overall = "PASS" if all(item["status"] == "PASS" for item in checks) else "WARN"
Expand Down
51 changes: 51 additions & 0 deletions tests/cli/integration/test_health_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,54 @@ def test_health_check_error_exit(runner, monkeypatch) -> None:
)
result = runner.invoke(app, ["health", "check", "--check-auth"])
assert result.exit_code == int(ExitCode.AUTH_ERROR)


def test_health_check_json_includes_token_store_mode(runner, monkeypatch) -> None:
monkeypatch.setattr(
health_cmd,
"run_health_check",
lambda **kwargs: {"overall": "PASS", "checks": [{"name": "base_url", "status": "PASS"}]},
)
monkeypatch.setattr(health_cmd, "get_token_store_mode", lambda: "plaintext-fallback")

result = runner.invoke(app, ["--json", "health", "check"])
assert result.exit_code == 0
payload = _json_output(result.stdout)
token_store_checks = [c for c in payload["data"]["checks"] if c["name"] == "token_store"]
assert token_store_checks == [
{"name": "token_store", "status": "WARN", "message": "plaintext-fallback"}
]


def test_health_check_does_not_duplicate_existing_token_store_check(runner, monkeypatch) -> None:
monkeypatch.setattr(
health_cmd,
"run_health_check",
lambda **kwargs: {
"overall": "PASS",
"checks": [{"name": "token_store", "status": "PASS", "message": "keyring"}],
},
)
monkeypatch.setattr(health_cmd, "get_token_store_mode", lambda: "plaintext-fallback")

result = runner.invoke(app, ["--json", "health", "check"])
assert result.exit_code == 0
payload = _json_output(result.stdout)
token_store_checks = [c for c in payload["data"]["checks"] if c["name"] == "token_store"]
assert token_store_checks == [{"name": "token_store", "status": "PASS", "message": "keyring"}]


def test_health_check_replaces_non_list_checks_with_token_store_check(runner, monkeypatch) -> None:
monkeypatch.setattr(
health_cmd,
"run_health_check",
lambda **kwargs: {"overall": "PASS", "checks": {"name": "base_url", "status": "PASS"}},
)
monkeypatch.setattr(health_cmd, "get_token_store_mode", lambda: "keyring")

result = runner.invoke(app, ["--json", "health", "check"])
assert result.exit_code == 0
payload = _json_output(result.stdout)
assert payload["data"]["checks"] == [
{"name": "token_store", "status": "PASS", "message": "keyring"}
]
55 changes: 55 additions & 0 deletions tests/cli/unit/test_keyring_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import threading
import time
import types
import warnings

import pytest

Expand Down Expand Up @@ -471,3 +472,57 @@ def delete_password(self, service: str, key: str) -> None:
assert reloaded._KEYRING_AVAILABLE is True
finally:
importlib.reload(original_module)


@pytest.mark.parametrize(
("keyring_available", "keyring_backend", "store_key", "allow_insecure", "os_name", "expected"),
[
(True, object(), "", "", "posix", "keyring"),
(False, None, "my-secret-passphrase", "", "posix", "encrypted-fallback"),
(False, None, "", "1", "posix", "plaintext-fallback"),
(False, None, "", "1", "nt", "unavailable"),
(False, None, "", "", "posix", "unavailable"),
],
)
def test_keyring_store_mode_detection(
monkeypatch,
keyring_available: bool,
keyring_backend: object | None,
store_key: str,
allow_insecure: str,
os_name: str,
expected: str,
) -> None:
monkeypatch.setattr(keyring_store, "_KEYRING_AVAILABLE", keyring_available)
monkeypatch.setattr(keyring_store, "_keyring", keyring_backend)
monkeypatch.setattr(keyring_store.os, "name", os_name, raising=False)

if store_key:
monkeypatch.setenv(keyring_store._TOKEN_STORE_KEY_ENV, store_key)
else:
monkeypatch.delenv(keyring_store._TOKEN_STORE_KEY_ENV, raising=False)

if allow_insecure:
monkeypatch.setenv(keyring_store._ALLOW_INSECURE_FALLBACK_ENV, allow_insecure)
else:
monkeypatch.delenv(keyring_store._ALLOW_INSECURE_FALLBACK_ENV, raising=False)

assert keyring_store.get_token_store_mode() == expected


def test_keyring_store_plaintext_fallback_warns_once_on_use(monkeypatch, tmp_path) -> None:
monkeypatch.setattr(keyring_store, "_KEYRING_AVAILABLE", False)
monkeypatch.setattr(keyring_store, "_keyring", None)
monkeypatch.setattr(keyring_store, "cache_dir", lambda: tmp_path)
monkeypatch.setattr(keyring_store.os, "name", "posix", raising=False)
monkeypatch.setenv(keyring_store._ALLOW_INSECURE_FALLBACK_ENV, "1")
monkeypatch.delenv(keyring_store._TOKEN_STORE_KEY_ENV, raising=False)
monkeypatch.setattr(keyring_store, "_PLAINTEXT_WARNING_EMITTED", False)

with pytest.warns(UserWarning, match="plaintext token fallback"):
keyring_store.save_tokens("demo", "acc", "ref")

with warnings.catch_warnings(record=True) as recorded:
warnings.simplefilter("always")
keyring_store.get_tokens("demo")
assert len(recorded) == 0
Loading