Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions ChatApp/file_crypto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import annotations

import os
from typing import Union

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC


_CHUNK_SIZE = 4096
_SALT_SIZE = 16
_NONCE_SIZE = 12
_TAG_SIZE = 16


def _derive_key(password: Union[str, bytes], salt: bytes) -> bytes:
"""Derive a 256-bit key from the given password and salt."""
if isinstance(password, str):
password = password.encode()
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=390000,
backend=default_backend(),
)
return kdf.derive(password)


def encrypt_file(
in_path: str,
out_path: str,
password: Union[str, bytes],
chunk_size: int = _CHUNK_SIZE,
) -> None:
"""Encrypt ``in_path`` to ``out_path`` using AES-GCM.

The output file layout is ``salt`` + ``nonce`` + ciphertext + ``tag``. Data
is processed in chunks so large files do not need to be fully loaded into
memory.
"""
salt = os.urandom(_SALT_SIZE)
key = _derive_key(password, salt)
nonce = os.urandom(_NONCE_SIZE)
cipher = Cipher(
algorithms.AES(key),
modes.GCM(nonce),
backend=default_backend(),
)
encryptor = cipher.encryptor()

try:
with open(in_path, "rb") as fin, open(out_path, "wb") as fout:
fout.write(salt)
fout.write(nonce)
while True:
chunk = fin.read(chunk_size)
if not chunk:
break
data = encryptor.update(chunk)
if data:
fout.write(data)
encryptor.finalize()
fout.write(encryptor.tag)
except Exception:
if os.path.exists(out_path):
os.remove(out_path)
raise


def decrypt_file(
in_path: str,
out_path: str,
password: Union[str, bytes],
chunk_size: int = _CHUNK_SIZE,
) -> None:
"""Decrypt ``in_path`` to ``out_path`` verifying the authentication tag."""
total_size = os.path.getsize(in_path)
if total_size < _SALT_SIZE + _NONCE_SIZE + _TAG_SIZE:
raise ValueError("Ciphertext too small")

with open(in_path, "rb") as fin:
salt = fin.read(_SALT_SIZE)
nonce = fin.read(_NONCE_SIZE)
key = _derive_key(password, salt)
cipher = Cipher(
algorithms.AES(key),
modes.GCM(nonce),
backend=default_backend(),
)
decryptor = cipher.decryptor()

ciphertext_len = total_size - _SALT_SIZE - _NONCE_SIZE - _TAG_SIZE
remaining = ciphertext_len
try:
with open(out_path, "wb") as fout:
while remaining > 0:
chunk = fin.read(min(chunk_size, remaining))
if not chunk:
break
remaining -= len(chunk)
data = decryptor.update(chunk)
if data:
fout.write(data)
tag = fin.read(_TAG_SIZE)
decryptor.finalize_with_tag(tag)
except Exception:
if os.path.exists(out_path):
os.remove(out_path)
raise
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Use an official Python runtime as a parent image
FROM python:3.11-slim
FROM python:3.11.13-slim-bullseye

LABEL maintainer="Psychevus"
LABEL description="Django WebSocket Chat App"
Expand All @@ -9,7 +9,6 @@ WORKDIR /app

# Install system dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y --no-install-recommends gcc libffi-dev libssl-dev \
default-libmysqlclient-dev && \
rm -rf /var/lib/apt/lists/*
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.demo
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.11-slim
FROM python:3.11.13-slim-bullseye

ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ wscat -c ws://localhost:8000/ws/room/test/

```bash
pip install -r requirements-dev.txt
pip install -r requirements.txt # includes cryptography and pyOpenSSL for file encryption
pytest --cov=ChatApp --cov=WebSocketChatApp
```

Expand Down
16 changes: 9 additions & 7 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
Django~=4.2.5
redis~=5.0.1
redis~=6.2.0
channels~=3.0.5
django_ratelimit>=3.0.2
mysqlclient~=2.1.0
celery>=5.3,<6
django-allauth
djangosaml2~=1.7.0
djangosaml2~=1.11.1
django-scim2
bleach~=6.1.0
bleach~=6.2.0
pyotp~=2.9.0
kafka-python~=2.0.2
PyJWT~=2.8.0
Expand All @@ -26,7 +26,9 @@ daphne>=3,<4
python-json-logger~=3.3
djangorestframework~=3.16.0
drf-spectacular~=0.27.0
pyfcm~=2.0
apns2~=0.7

deprecated~=1.2.14
pyfcm~=2.0
apns2==0.7.1

deprecated~=1.2.14
cryptography>=45.0,<46
pyOpenSSL>=25.1,<26
31 changes: 31 additions & 0 deletions tests/test_file_crypto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
import tempfile

from ChatApp.file_crypto import encrypt_file, decrypt_file


def _roundtrip(data: bytes, password: str = "pass"):
with tempfile.TemporaryDirectory() as tmp:
plain = os.path.join(tmp, "plain.bin")
enc = os.path.join(tmp, "enc.bin")
dec = os.path.join(tmp, "dec.bin")
with open(plain, "wb") as fh:
fh.write(data)
encrypt_file(plain, enc, password)
decrypt_file(enc, dec, password)
with open(dec, "rb") as fh:
return fh.read()


def test_encrypt_decrypt_empty_file():
assert _roundtrip(b"") == b""


def test_encrypt_decrypt_small_file():
data = b"hello world" * 5
assert _roundtrip(data) == data


def test_encrypt_decrypt_large_file():
data = os.urandom(1024 * 1024) # 1MB
assert _roundtrip(data) == data