diff --git a/docs/configuration.md b/docs/configuration.md
index 9fe9a3a..159a68b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -11,6 +11,7 @@ options = KsefClientOptions(
base_url="https://api-test.ksef.mf.gov.pl",
timeout_seconds=30.0,
verify_ssl=True,
+ require_export_part_hash=True,
proxy=None,
custom_headers={"X-Custom-Header": "value"},
follow_redirects=False,
@@ -65,6 +66,15 @@ Opcja zwykle nie jest potrzebna. Włączenie ma uzasadnienie wyłącznie w środ
Domyślnie `True`. Wyłączenie ma uzasadnienie wyłącznie w specyficznych środowiskach testowych (np. z własnym MITM/proxy).
+### `require_export_part_hash`
+
+Domyślnie `True`. Dotyczy pobierania partów eksportu (`ExportWorkflow`, `AsyncExportWorkflow`):
+
+- dla każdego pobranego, zaszyfrowanego partu biblioteka liczy `SHA-256` i porównuje z nagłówkiem `x-ms-meta-hash` (base64), jeśli nagłówek jest obecny;
+- jeśli `x-ms-meta-hash` nie ma i opcja jest `True`, biblioteka zgłasza `ValueError`;
+- jeśli hash się nie zgadza, biblioteka zgłasza `ValueError`;
+- ustawienie `False` pozwala przejść dalej, gdy nagłówek hash nie został zwrócony (nadal występuje walidacja, gdy hash jest obecny).
+
### `strict_presigned_url_validation`
Domyślnie `True`. Dla absolutnych URL używanych z `skip_auth=True` wymusza `https`. Przy wyłączeniu możliwe są URL `http`, ale nadal działa walidacja hosta/IP.
diff --git a/docs/workflows/export.md b/docs/workflows/export.md
index b1a0177..6178649 100644
--- a/docs/workflows/export.md
+++ b/docs/workflows/export.md
@@ -70,6 +70,10 @@ print(len(result.metadata_summaries), len(result.invoice_xml_files))
## Uwagi
- Części paczki są dostępne pod `package.parts[].url` i są pobierane **bez Bearer tokena** (pre-signed URL).
+- Dla każdego pobranego (zaszyfrowanego) partu workflow liczy hash `SHA-256` (base64) i porównuje z `x-ms-meta-hash`, jeśli nagłówek jest obecny.
+- Domyślnie (`KsefClientOptions.require_export_part_hash=True`) brak `x-ms-meta-hash` powoduje `ValueError`.
+- Niezgodność hash (`x-ms-meta-hash` vs. wyliczony hash) zawsze powoduje `ValueError`.
+- Jeśli integracja wymaga tolerowania braku nagłówka, ustaw `require_export_part_hash=False` w `KsefClientOptions` lub podczas tworzenia workflow.
- Linki do partów wygasają; pobranie powinno nastąpić bez zbędnej zwłoki.
- Paczka eksportu zawiera `_metadata.json` (dla deduplikacji i synchronizacji przyrostowej).
diff --git a/src/ksef_client/config.py b/src/ksef_client/config.py
index e384a84..16f29e4 100644
--- a/src/ksef_client/config.py
+++ b/src/ksef_client/config.py
@@ -40,6 +40,7 @@ class KsefClientOptions:
custom_headers: dict[str, str] | None = None
follow_redirects: bool = False
verify_ssl: bool = True
+ require_export_part_hash: bool = True
strict_presigned_url_validation: bool = True
allowed_presigned_hosts: list[str] | None = None
allow_private_network_presigned_urls: bool = False
diff --git a/src/ksef_client/services/workflows.py b/src/ksef_client/services/workflows.py
index f737dab..6f22990 100644
--- a/src/ksef_client/services/workflows.py
+++ b/src/ksef_client/services/workflows.py
@@ -1,6 +1,8 @@
from __future__ import annotations
import asyncio
+import base64
+import hashlib
import time
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
@@ -777,9 +779,18 @@ class PackageProcessingResult:
class ExportWorkflow:
- def __init__(self, invoices_client: InvoicesClient, http_client: _RequestHttpClient) -> None:
+ def __init__(
+ self,
+ invoices_client: InvoicesClient,
+ http_client: _RequestHttpClient,
+ require_export_part_hash: bool | None = None,
+ ) -> None:
self._invoices = invoices_client
self._download_helper = ExportDownloadHelper(http_client)
+ self._require_export_part_hash = _resolve_export_part_hash_requirement(
+ http_client=http_client,
+ explicit_value=require_export_part_hash,
+ )
def download_and_process_package(
self,
@@ -787,7 +798,15 @@ def download_and_process_package(
encryption_data: EncryptionData,
) -> PackageProcessingResult:
parts = package.get("parts") or []
- encrypted_parts = self._download_helper.download_parts(parts)
+ encrypted_parts_with_hash = self._download_helper.download_parts_with_hash(parts)
+ for index, (part_bytes, part_hash) in enumerate(encrypted_parts_with_hash, start=1):
+ _validate_export_part_hash(
+ part_bytes,
+ part_hash,
+ require_export_part_hash=self._require_export_part_hash,
+ part_index=index,
+ )
+ encrypted_parts = [part_bytes for part_bytes, _ in encrypted_parts_with_hash]
decrypted_parts = [
decrypt_aes_cbc_pkcs7(part, encryption_data.key, encryption_data.iv)
for part in encrypted_parts
@@ -816,9 +835,14 @@ def __init__(
self,
invoices_client: AsyncInvoicesClient,
http_client: _AsyncRequestHttpClient,
+ require_export_part_hash: bool | None = None,
) -> None:
self._invoices = invoices_client
self._download_helper = AsyncExportDownloadHelper(http_client)
+ self._require_export_part_hash = _resolve_export_part_hash_requirement(
+ http_client=http_client,
+ explicit_value=require_export_part_hash,
+ )
async def download_and_process_package(
self,
@@ -826,7 +850,15 @@ async def download_and_process_package(
encryption_data: EncryptionData,
) -> PackageProcessingResult:
parts = package.get("parts") or []
- encrypted_parts = await self._download_helper.download_parts(parts)
+ encrypted_parts_with_hash = await self._download_helper.download_parts_with_hash(parts)
+ for index, (part_bytes, part_hash) in enumerate(encrypted_parts_with_hash, start=1):
+ _validate_export_part_hash(
+ part_bytes,
+ part_hash,
+ require_export_part_hash=self._require_export_part_hash,
+ part_index=index,
+ )
+ encrypted_parts = [part_bytes for part_bytes, _ in encrypted_parts_with_hash]
decrypted_parts = [
decrypt_aes_cbc_pkcs7(part, encryption_data.key, encryption_data.iv)
for part in encrypted_parts
@@ -848,3 +880,33 @@ async def download_and_process_package(
invoice_xml_files[name] = content.decode("utf-8")
return PackageProcessingResult(metadata_summaries, invoice_xml_files)
+
+
+def _resolve_export_part_hash_requirement(
+ *, http_client: _RequestHttpClient | _AsyncRequestHttpClient, explicit_value: bool | None
+) -> bool:
+ if explicit_value is not None:
+ return explicit_value
+ options = getattr(http_client, "_options", None)
+ if options is None:
+ return True
+ return bool(getattr(options, "require_export_part_hash", True))
+
+
+def _validate_export_part_hash(
+ part_bytes: bytes,
+ expected_hash: str | None,
+ *,
+ require_export_part_hash: bool,
+ part_index: int,
+) -> None:
+ if expected_hash is None:
+ if require_export_part_hash:
+ raise ValueError(f"Missing export part hash for part #{part_index}.")
+ return
+ actual_hash = base64.b64encode(hashlib.sha256(part_bytes).digest()).decode("ascii")
+ if expected_hash != actual_hash:
+ raise ValueError(
+ f"Export part hash mismatch for part #{part_index}: "
+ f"expected '{expected_hash}', got '{actual_hash}'."
+ )
diff --git a/tests/test_services_workflows.py b/tests/test_services_workflows.py
index 1ca3dcf..88d7855 100644
--- a/tests/test_services_workflows.py
+++ b/tests/test_services_workflows.py
@@ -1,3 +1,5 @@
+import base64
+import hashlib
import json
import unittest
from dataclasses import dataclass
@@ -7,6 +9,7 @@
import httpx
from ksef_client.clients.invoices import AsyncInvoicesClient, InvoicesClient
+from ksef_client.config import KsefClientOptions
from ksef_client.http import HttpResponse
from ksef_client.services import workflows
from ksef_client.services.crypto import encrypt_aes_cbc_pkcs7, generate_iv, generate_symmetric_key
@@ -15,6 +18,10 @@
from tests.helpers import generate_rsa_cert
+def _sha256_b64(payload: bytes) -> str:
+ return base64.b64encode(hashlib.sha256(payload).digest()).decode("ascii")
+
+
class RecordingHttp:
def __init__(self, content: bytes = b"ok", headers: dict | None = None) -> None:
self.calls: list[tuple[tuple[Any, ...], dict[str, Any]]] = []
@@ -368,11 +375,106 @@ class DummyInvoices:
pass
workflow = workflows.ExportWorkflow(cast(InvoicesClient, DummyInvoices()), RecordingHttp())
- with patch.object(workflow._download_helper, "download_parts", return_value=[encrypted]):
+ with patch.object(
+ workflow._download_helper,
+ "download_parts_with_hash",
+ return_value=[(encrypted, _sha256_b64(encrypted))],
+ ):
result = workflow.download_and_process_package({"parts": [{"url": "u"}]}, encryption)
self.assertEqual(result.metadata_summaries[0]["ksefNumber"], "1")
self.assertIn("inv.xml", result.invoice_xml_files)
+ def test_export_workflow_rejects_missing_hash_by_default(self):
+ key = generate_symmetric_key()
+ iv = generate_iv()
+ encrypted = encrypt_aes_cbc_pkcs7(build_zip({"inv.xml": b""}), key, iv)
+ encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type]
+
+ class DummyInvoices:
+ pass
+
+ workflow = workflows.ExportWorkflow(cast(InvoicesClient, DummyInvoices()), RecordingHttp())
+ with (
+ patch.object(
+ workflow._download_helper,
+ "download_parts_with_hash",
+ return_value=[(encrypted, None)],
+ ),
+ self.assertRaises(ValueError) as exc,
+ ):
+ workflow.download_and_process_package({"parts": [{"url": "u"}]}, encryption)
+ self.assertIn("Missing export part hash", str(exc.exception))
+
+ def test_export_workflow_allows_missing_hash_when_disabled(self):
+ key = generate_symmetric_key()
+ iv = generate_iv()
+ encrypted = encrypt_aes_cbc_pkcs7(build_zip({"inv.xml": b""}), key, iv)
+ encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type]
+
+ class DummyInvoices:
+ pass
+
+ workflow = workflows.ExportWorkflow(
+ cast(InvoicesClient, DummyInvoices()),
+ RecordingHttp(),
+ require_export_part_hash=False,
+ )
+ with patch.object(
+ workflow._download_helper,
+ "download_parts_with_hash",
+ return_value=[(encrypted, None)],
+ ):
+ result = workflow.download_and_process_package({"parts": [{"url": "u"}]}, encryption)
+ self.assertIn("inv.xml", result.invoice_xml_files)
+
+ def test_export_workflow_rejects_hash_mismatch(self):
+ key = generate_symmetric_key()
+ iv = generate_iv()
+ encrypted = encrypt_aes_cbc_pkcs7(build_zip({"inv.xml": b""}), key, iv)
+ encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type]
+
+ class DummyInvoices:
+ pass
+
+ workflow = workflows.ExportWorkflow(cast(InvoicesClient, DummyInvoices()), RecordingHttp())
+ with (
+ patch.object(
+ workflow._download_helper,
+ "download_parts_with_hash",
+ return_value=[(encrypted, "bad-hash")],
+ ),
+ self.assertRaises(ValueError) as exc,
+ ):
+ workflow.download_and_process_package({"parts": [{"url": "u"}]}, encryption)
+ self.assertIn("Export part hash mismatch", str(exc.exception))
+
+ def test_export_workflow_reads_hash_requirement_from_client_options(self):
+ key = generate_symmetric_key()
+ iv = generate_iv()
+ encrypted = encrypt_aes_cbc_pkcs7(build_zip({"inv.xml": b""}), key, iv)
+ encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type]
+
+ class DummyInvoices:
+ pass
+
+ class RecordingHttpWithOptions(RecordingHttp):
+ def __init__(self) -> None:
+ super().__init__()
+ self._options = KsefClientOptions(
+ base_url="https://api-test.ksef.mf.gov.pl",
+ require_export_part_hash=False,
+ )
+
+ http = RecordingHttpWithOptions()
+ workflow = workflows.ExportWorkflow(cast(InvoicesClient, DummyInvoices()), http)
+ with patch.object(
+ workflow._download_helper,
+ "download_parts_with_hash",
+ return_value=[(encrypted, None)],
+ ):
+ result = workflow.download_and_process_package({"parts": [{"url": "u"}]}, encryption)
+ self.assertIn("inv.xml", result.invoice_xml_files)
+
class AsyncWorkflowsTests(unittest.IsolatedAsyncioTestCase):
async def test_async_batch_upload_helper(self):
@@ -565,14 +667,87 @@ class DummyInvoices:
)
with patch.object(
workflow._download_helper,
- "download_parts",
- AsyncMock(return_value=[encrypted]),
+ "download_parts_with_hash",
+ AsyncMock(return_value=[(encrypted, _sha256_b64(encrypted))]),
):
result = await workflow.download_and_process_package(
{"parts": [{"url": "u"}]}, encryption
)
self.assertEqual(result.metadata_summaries[0]["ksefNumber"], "1")
+ async def test_async_export_workflow_rejects_missing_hash_by_default(self):
+ key = generate_symmetric_key()
+ iv = generate_iv()
+ encrypted = encrypt_aes_cbc_pkcs7(build_zip({"inv.xml": b""}), key, iv)
+ encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type]
+
+ class DummyInvoices:
+ pass
+
+ workflow = workflows.AsyncExportWorkflow(
+ cast(AsyncInvoicesClient, DummyInvoices()),
+ RecordingAsyncHttp(),
+ )
+ with (
+ patch.object(
+ workflow._download_helper,
+ "download_parts_with_hash",
+ AsyncMock(return_value=[(encrypted, None)]),
+ ),
+ self.assertRaises(ValueError) as exc,
+ ):
+ await workflow.download_and_process_package({"parts": [{"url": "u"}]}, encryption)
+ self.assertIn("Missing export part hash", str(exc.exception))
+
+ async def test_async_export_workflow_allows_missing_hash_when_disabled(self):
+ key = generate_symmetric_key()
+ iv = generate_iv()
+ encrypted = encrypt_aes_cbc_pkcs7(build_zip({"inv.xml": b""}), key, iv)
+ encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type]
+
+ class DummyInvoices:
+ pass
+
+ workflow = workflows.AsyncExportWorkflow(
+ cast(AsyncInvoicesClient, DummyInvoices()),
+ RecordingAsyncHttp(),
+ require_export_part_hash=False,
+ )
+ with patch.object(
+ workflow._download_helper,
+ "download_parts_with_hash",
+ AsyncMock(return_value=[(encrypted, None)]),
+ ):
+ result = await workflow.download_and_process_package(
+ {"parts": [{"url": "u"}]},
+ encryption,
+ )
+ self.assertIn("inv.xml", result.invoice_xml_files)
+
+ async def test_async_export_workflow_rejects_hash_mismatch(self):
+ key = generate_symmetric_key()
+ iv = generate_iv()
+ encrypted = encrypt_aes_cbc_pkcs7(build_zip({"inv.xml": b""}), key, iv)
+ encryption = workflows.EncryptionData(key=key, iv=iv, encryption_info=None) # type: ignore[arg-type]
+
+ class DummyInvoices:
+ pass
+
+ workflow = workflows.AsyncExportWorkflow(
+ cast(AsyncInvoicesClient, DummyInvoices()),
+ RecordingAsyncHttp(),
+ )
+ with (
+ patch.object(
+ workflow._download_helper,
+ "download_parts_with_hash",
+ AsyncMock(return_value=[(encrypted, "bad-hash")]),
+ ),
+ self.assertRaises(ValueError) as exc,
+ ):
+ await workflow.download_and_process_package({"parts": [{"url": "u"}]}, encryption)
+ self.assertIn("Export part hash mismatch", str(exc.exception))
+
if __name__ == "__main__":
unittest.main()