Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ options = KsefClientOptions(
proxy=None,
custom_headers={"X-Custom-Header": "value"},
follow_redirects=False,
strict_presigned_url_validation=True,
allowed_presigned_hosts=None,
allow_private_network_presigned_urls=False,
base_qr_url=None,
)
```
Expand Down Expand Up @@ -62,6 +65,18 @@ Opcja zwykle nie jest potrzebna. Włączenie ma uzasadnienie wyłącznie w środ

Domyślnie `True`. Wyłączenie ma uzasadnienie wyłącznie w specyficznych środowiskach testowych (np. z własnym MITM/proxy).

### `strict_presigned_url_validation`

Domyślnie `True`. Dla absolutnych URL używanych z `skip_auth=True` wymusza `https`. Przy wyłączeniu możliwe są URL `http`, ale nadal działa walidacja hosta/IP.

### `allowed_presigned_hosts`

Domyślnie `None` (brak allowlisty). Jeśli ustawione, host pre-signed URL musi pasować dokładnie albo jako subdomena (np. `a.uploads.example.com` pasuje do `uploads.example.com`).

### `allow_private_network_presigned_urls`

Domyślnie `False`. Gdy `False`, blokowane są hosty IP prywatne/link-local/reserved dla żądań `skip_auth=True`. Ustaw `True` wyłącznie w kontrolowanym środowisku.

## Przekazywanie `access_token`

Dostępne są dwa sposoby przekazywania `access_token`:
Expand Down
4 changes: 4 additions & 0 deletions docs/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Obsługa błędów jest oparta o kody HTTP (>= 400). Biblioteka nie interpretuje

## Typy wyjątków

### `ValueError` (walidacja pre-signed URL)

Dla żądań z `skip_auth=True` i absolutnym URL biblioteka wykonuje walidację bezpieczeństwa. W przypadku niespełnienia reguł (np. host `localhost`, loopback/private IP bez opt-in, host poza allowlistą, albo `http` przy `strict_presigned_url_validation=True`) rzucany jest `ValueError` z komunikatem bezpieczeństwa.

### `KsefHttpError`

Bazowy błąd HTTP.
Expand Down
3 changes: 3 additions & 0 deletions src/ksef_client/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ class KsefClientOptions:
custom_headers: dict[str, str] | None = None
follow_redirects: bool = False
verify_ssl: bool = True
strict_presigned_url_validation: bool = True
allowed_presigned_hosts: list[str] | None = None
allow_private_network_presigned_urls: bool = False
user_agent: str = field(default_factory=_default_user_agent)

def normalized_base_url(self) -> str:
Expand Down
79 changes: 77 additions & 2 deletions src/ksef_client/http.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import ipaddress
from dataclasses import dataclass
from typing import Any
from urllib.parse import urlparse

import httpx

Expand All @@ -17,6 +19,75 @@ def _merge_headers(base: dict[str, str], extra: dict[str, str] | None) -> dict[s
return merged


def _is_absolute_http_url(url: str) -> bool:
return url.startswith("http://") or url.startswith("https://")


def _host_allowed(host: str, allowed_hosts: list[str]) -> bool:
normalized_host = host.lower().rstrip(".")
for allowed in allowed_hosts:
normalized_allowed = allowed.lower().strip().rstrip(".")
if not normalized_allowed:
continue
if normalized_host == normalized_allowed:
return True
try:
ipaddress.ip_address(normalized_allowed)
continue
except ValueError:
pass
if normalized_host.endswith("." + normalized_allowed):
return True
return False


def _validate_presigned_url_security(options: KsefClientOptions, url: str) -> None:
parsed = urlparse(url)
host = parsed.hostname
if not host:
raise ValueError("Rejected insecure presigned URL: host is missing.")

normalized_host = host.lower().rstrip(".")
if normalized_host == "localhost" or normalized_host.endswith(".localhost"):
raise ValueError(
"Rejected insecure presigned URL: localhost hosts are not allowed "
"for skip_auth requests."
)

if options.strict_presigned_url_validation and parsed.scheme != "https":
raise ValueError(
"Rejected insecure presigned URL: https is required for skip_auth requests."
)

try:
host_ip = ipaddress.ip_address(normalized_host)
except ValueError:
host_ip = None

if host_ip is not None:
if host_ip.is_loopback:
raise ValueError(
"Rejected insecure presigned URL: loopback addresses are not allowed "
"for skip_auth requests."
)
if (
not options.allow_private_network_presigned_urls
and (host_ip.is_private or host_ip.is_link_local or host_ip.is_reserved)
):
raise ValueError(
"Rejected insecure presigned URL: private, link-local, and reserved "
"IP hosts are blocked for skip_auth requests."
)

if options.allowed_presigned_hosts and not _host_allowed(
normalized_host, options.allowed_presigned_hosts
):
raise ValueError(
"Rejected insecure presigned URL: host is not in allowed_presigned_hosts "
"for skip_auth requests."
)


@dataclass
class HttpResponse:
status_code: int
Expand Down Expand Up @@ -60,8 +131,10 @@ def request(
expected_status: set[int] | None = None,
) -> HttpResponse:
url = path
if not url.startswith("http://") and not url.startswith("https://"):
if not _is_absolute_http_url(url):
url = self._options.normalized_base_url().rstrip("/") + "/" + path.lstrip("/")
elif skip_auth:
_validate_presigned_url_security(self._options, url)

base_headers = {
"User-Agent": self._options.user_agent,
Expand Down Expand Up @@ -166,8 +239,10 @@ async def request(
expected_status: set[int] | None = None,
) -> HttpResponse:
url = path
if not url.startswith("http://") and not url.startswith("https://"):
if not _is_absolute_http_url(url):
url = self._options.normalized_base_url().rstrip("/") + "/" + path.lstrip("/")
elif skip_auth:
_validate_presigned_url_security(self._options, url)

base_headers = {
"User-Agent": self._options.user_agent,
Expand Down
102 changes: 101 additions & 1 deletion tests/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@

from ksef_client.config import KsefClientOptions
from ksef_client.exceptions import KsefApiError, KsefHttpError, KsefRateLimitError
from ksef_client.http import AsyncBaseHttpClient, BaseHttpClient, HttpResponse, _merge_headers
from ksef_client.http import (
AsyncBaseHttpClient,
BaseHttpClient,
HttpResponse,
_host_allowed,
_merge_headers,
_validate_presigned_url_security,
)


class HttpTests(unittest.TestCase):
Expand Down Expand Up @@ -119,6 +126,93 @@ def test_default_status_error(self):
):
client.request("GET", "/path")

def test_skip_auth_presigned_url_accepts_valid_https(self):
options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl")
client = BaseHttpClient(options)
response = httpx.Response(200, json={"ok": True})
with patch.object(client._client, "request", Mock(return_value=response)) as request_mock:
client.request("GET", "https://files.example.com/upload", skip_auth=True)
_, kwargs = request_mock.call_args
self.assertEqual(kwargs["url"], "https://files.example.com/upload")
self.assertNotIn("Authorization", kwargs["headers"])

def test_skip_auth_presigned_url_rejects_http_when_strict(self):
options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl")
client = BaseHttpClient(options)
with self.assertRaisesRegex(ValueError, "https is required"):
client.request("GET", "http://files.example.com/upload", skip_auth=True)

def test_skip_auth_presigned_url_allows_http_when_not_strict(self):
options = KsefClientOptions(
base_url="https://api-test.ksef.mf.gov.pl",
strict_presigned_url_validation=False,
)
client = BaseHttpClient(options)
response = httpx.Response(200, json={"ok": True})
with patch.object(client._client, "request", Mock(return_value=response)) as request_mock:
client.request("GET", "http://files.example.com/upload", skip_auth=True)
_, kwargs = request_mock.call_args
self.assertEqual(kwargs["url"], "http://files.example.com/upload")

def test_skip_auth_presigned_url_rejects_localhost_and_loopback(self):
options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl")
client = BaseHttpClient(options)
with self.assertRaisesRegex(ValueError, "localhost"):
client.request("GET", "https://localhost/upload", skip_auth=True)
with self.assertRaisesRegex(ValueError, "loopback"):
client.request("GET", "https://127.0.0.1/upload", skip_auth=True)

def test_skip_auth_presigned_url_rejects_private_ip_by_default(self):
options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl")
client = BaseHttpClient(options)
with self.assertRaisesRegex(ValueError, "private, link-local, and reserved IP"):
client.request("GET", "https://10.1.2.3/upload", skip_auth=True)

def test_skip_auth_presigned_url_allows_private_ip_when_opted_in(self):
options = KsefClientOptions(
base_url="https://api-test.ksef.mf.gov.pl",
allow_private_network_presigned_urls=True,
)
client = BaseHttpClient(options)
response = httpx.Response(200, json={"ok": True})
with patch.object(client._client, "request", Mock(return_value=response)) as request_mock:
client.request("GET", "https://10.1.2.3/upload", skip_auth=True)
_, kwargs = request_mock.call_args
self.assertEqual(kwargs["url"], "https://10.1.2.3/upload")

def test_skip_auth_presigned_url_allowlist_exact_and_subdomain(self):
options = KsefClientOptions(
base_url="https://api-test.ksef.mf.gov.pl",
allowed_presigned_hosts=["uploads.example.com"],
)
client = BaseHttpClient(options)
response = httpx.Response(200, json={"ok": True})
with patch.object(client._client, "request", Mock(return_value=response)):
client.request("GET", "https://uploads.example.com/path", skip_auth=True)
client.request("GET", "https://sub.uploads.example.com/path", skip_auth=True)

def test_skip_auth_presigned_url_allowlist_rejects_other_hosts(self):
options = KsefClientOptions(
base_url="https://api-test.ksef.mf.gov.pl",
allowed_presigned_hosts=["uploads.example.com"],
)
client = BaseHttpClient(options)
with self.assertRaisesRegex(ValueError, "allowed_presigned_hosts"):
client.request("GET", "https://other.example.com/path", skip_auth=True)

def test_host_allowed_skips_empty_and_ip_allowlist_entries(self):
self.assertTrue(
_host_allowed(
"sub.uploads.example.com",
["", "10.0.0.1", "uploads.example.com"],
)
)

def test_validate_presigned_url_security_rejects_missing_host(self):
options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl")
with self.assertRaisesRegex(ValueError, "host is missing"):
_validate_presigned_url_security(options, "https:///no-host")


class AsyncHttpTests(unittest.IsolatedAsyncioTestCase):
async def test_async_request(self):
Expand Down Expand Up @@ -193,6 +287,12 @@ async def test_async_raise_for_status_paths(self):
with self.assertRaises(KsefHttpError):
client._raise_for_status(response_http)

async def test_async_skip_auth_presigned_validation_rejects_localhost(self):
options = KsefClientOptions(base_url="https://api-test.ksef.mf.gov.pl")
client = AsyncBaseHttpClient(options)
with self.assertRaisesRegex(ValueError, "localhost"):
await client.request("GET", "https://localhost/upload", skip_auth=True)


if __name__ == "__main__":
unittest.main()
Loading