diff --git a/.env b/.env new file mode 100644 index 0000000..46305e4 --- /dev/null +++ b/.env @@ -0,0 +1,19 @@ +# .env + +CELERY_BROKER_URL=redis://redis:6379/0 +CELERY_RESULT_BACKEND=db+postgresql+psycopg://user:password@database:5432/alpha + +POSTGRES_USER=user +POSTGRES_PASSWORD=password +POSTGRES_DB=alpha +POSTGRES_HOST=database +POSTGRES_PORT=5432 +POSTGRES_URL=postgresql+psycopg://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB} + + +VALID_CLIENT_ID=UezjwqiRG6cKJRtSdTBjrkLI09HpWPxDLdweOisr +VALID_CLIENT_SECRET=V5tSsR9VSWISVamUl8KweBiydAZ2SKV7Rs4NqZIeTQBB42IMrAGB1SRpoJpmIvEdcKQaGjrt6XLvRmwcUnqWV5CzO7ReHHHWS1x5GwxvxOI6tq87HpV8DlZZStGqZGUL + +JWT_SECRET_KEY=your_super_secret_key +JWT_ALGORITHM=HS256 +ACCESS_TOKEN_EXPIRE_SECONDS=31536000 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3aa1d49 --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +.vscode/ +*.vscode +*$py.class +*.ruff_cache/* +.idea +*.idea +.idea/ + +# Environments +# .env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ \ No newline at end of file diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..942803d --- /dev/null +++ b/.pylintrc @@ -0,0 +1,144 @@ +[MASTER] +ignore=.git,__pycache__,migrations,node_modules,.venv +init-hook='import sys; sys.path.append("api")' +load-plugins=pylint_celery,pylint_pytest +persistent=yes + +[MESSAGES CONTROL] +disable= + missing-docstring, + invalid-name, + too-few-public-methods, + too-many-arguments, + duplicate-code, + no-member, + unused-argument, + redefined-outer-name, + import-error, + no-name-in-module, + broad-exception-raised, + logging-fstring-interpolation + +[REPORTS] +output-format=text +reports=no +score=no + +[BASIC] +good-names=i,j,k,_,db +bad-names=foo,bar,baz +variable-rgx=[a-z_][a-z0-9_]{2,30}$ +function-rgx=([a-z_][a-z0-9_]{2,40}|test_[a-z0-9_]+)$ +argument-rgx=[a-z_][a-z0-9_]{2,30}$ +attr-rgx=[a-z_][a-z0-9_]{2,30}$ +class-rgx=[A-Z][a-zA-Z0-9]+$ +const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$ + +[FORMAT] +max-line-length=88 +indent-string=' ' +ignore-long-lines=^\s*(# )?? + +[TYPECHECK] +ignore-mixin-members=yes +generated-members=request,user,objects,status_code,celery_app + +[DESIGN] +max-args=6 +max-returns=10 +max-branches=15 +max-statements=60 +max-locals=20 +max-attributes=10 + +[IMPORTS] +known-standard-library=typing,os,sys +known-third-party=fastapi,pydantic,celery,sqlalchemy,uvicorn,psycopg2,redis +known-first-party=api,services +import-order-style=google + +[MISCELLANEOUS] +notes=TODO,FIXME,XXX + +[LOGGING] +logging-modules=logging + +[VARIABLES] +dummy-variables-rgx=_|dummy + +[SIMILARITIES][MASTER] +ignore=.git,__pycache__,migrations,node_modules,.venv +init-hook='import sys; sys.path.append("api")' +load-plugins=pylint_celery,pylint_pytest +persistent=yes + +[MESSAGES CONTROL] +disable= + missing-docstring, + invalid-name, + too-few-public-methods, + too-many-arguments, + duplicate-code, + no-member, + unused-argument, + redefined-outer-name, + import-error, + broad-exception-raised, + logging-fstring-interpolation + +[REPORTS] +output-format=text +reports=no +score=no + +[BASIC] +good-names=i,j,k,_,db +bad-names=foo,bar,baz +variable-rgx=[a-z_][a-z0-9_]{2,30}$ +function-rgx=([a-z_][a-z0-9_]{2,40}|test_[a-z0-9_]+)$ +argument-rgx=[a-z_][a-z0-9_]{2,30}$ +attr-rgx=[a-z_][a-z0-9_]{2,30}$ +class-rgx=[A-Z][a-zA-Z0-9]+$ +const-rgx=(([A-Z_][A-Z0-9_]*)|(__.*__))$ + +[FORMAT] +max-line-length=120 +indent-string=' ' +ignore-long-lines=^\s*(# )?? + +[TYPECHECK] +ignore-mixin-members=yes +generated-members=request,user,objects,status_code,celery_app + +[DESIGN] +max-args=6 +max-returns=10 +max-branches=15 +max-statements=60 +max-locals=20 +max-attributes=10 + +[MISCELLANEOUS] +notes=TODO,FIXME,XXX + +[LOGGING] +logging-modules=logging + +[VARIABLES] +dummy-variables-rgx=_|dummy + +[SIMILARITIES] +min-similarity-lines=4 +ignore-comments=yes +ignore-docstrings=yes +ignore-imports=yes + +[EXCEPTIONS] +overgeneral-exceptions=Exception +min-similarity-lines=4 +ignore-comments=yes +ignore-docstrings=yes +ignore-imports=yes + +[EXCEPTIONS] +overgeneral-exceptions=Exception \ No newline at end of file diff --git a/IMPROVEMENTS.md b/IMPROVEMENTS.md new file mode 100644 index 0000000..cc4b2ab --- /dev/null +++ b/IMPROVEMENTS.md @@ -0,0 +1,135 @@ +## Pistes d'amélioration des tests API + +Ces améliorations ont été identifiées mais non implémentées par manque de temps. Elles visent à renforcer la couverture des tests, la robustesse du système et la gestion des cas limites. + +### Tests sur l’import de fichiers CSV + +- [ ] Ajouter un test avec un fichier mal formé (ex : en-têtes manquants, colonnes incorrectes). +- [ ] Ajouter un test avec un fichier d’un type incorrect (ex : `.txt`, `.json`) pour vérifier le filtrage MIME. +- [ ] Tester l'import d'un fichier très volumineux pour valider le comportement en cas de surcharge. + +### Tests de sécurité + +- [ ] Supprimer ou désactiver temporairement le mock de l’authentification (`fake_get_current_user`) pour tester les comportements réels liés aux tokens. +- [ ] Vérifier le comportement **sans jeton d’authentification** : une requête sans en-tête `Authorization` doit retourner un code 401. +- [ ] Vérifier le comportement **avec un jeton invalide ou expiré** : la route doit refuser l'accès avec un message d'erreur approprié. + +### Tests des cas limites et erreurs + +- [ ] Tester la route `/api/imports/{job_id}/status` avec un UUID invalide (format incorrect → 422 attendu). +- [ ] Tester `/api/clients` sans pagination pour valider le fallback aux valeurs par défaut. +- [ ] Ajouter un test sur une route inexistante pour vérifier la gestion des 404. + + + + +## Améliorations possibles des tests de la tâche d’import + +Ces pistes d'amélioration visent à renforcer la robustesse des tests liés à la tâche `run_import_job` et à la fonction `process_csv`. Faute de temps, elles n'ont pas été mises en place, mais elles seraient prioritaires dans un contexte de production. + +### Cas fonctionnels et de validation + +- [ ] Import de fichiers CSV vides, sans aucune ligne à traiter. +- [ ] Fichiers CSV composés uniquement de lignes invalides (échec total de validation). +- [ ] Données contenant des dates malformées, absentes ou incorrectes. +- [ ] Lignes avec des champs manquants ou vides (ex. email absent). +- [ ] CSV avec des doublons ou incohérences logiques (même email pour plusieurs lignes). +👉 Actuellement, **aucune logique de détection ou de rejet des doublons ou encodage inhabituel dans les données etc** (par exemple sur l'email) n'est implémentée. + Il serait pertinent d’ajouter une **vérification coté schéma/model** pour éviter l’insertion de clients identiques ou données avec accents, emojis par exemple. + +### Tests de performance et scalabilité + +- [ ] Gestion de très gros fichiers CSV pour tester la montée en charge. +- [ ] Évaluer la consommation mémoire / temps de traitement sur des volumes élevés. + +### Cas d’erreurs techniques + +- [ ] Cas où la connexion à la base de données échoue ou la session n’est pas accessible. +- [ ] Scénarios où la tâche Celery dépasse le nombre maximal de retries autorisés. +- [ ] Gestion des erreurs inattendues lors du traitement du CSV (ex. crash Pydantic, encodage). + +### Tests d’intégration + +- [ ] Ajouter des tests d’intégration réels exécutant la fonction `process_csv` sur de vrais fichiers CSV et une base de données de test (sans mocks), afin de valider le comportement complet en environnement réel. + + +### Améliorations possibles des modèles (`models.py`) + +- [ ] Ajouter une contrainte d’unicité sur `Client.email` pour éviter les doublons côté base de données. + 👉 Actuellement, aucun contrôle ne garantit qu’un même email ne soit pas inséré plusieurs fois. + +- [ ] Ajouter un champ `import_job_id` (clé étrangère) dans `Client` pour tracer l’origine d’un enregistrement. + 👉 Cela permettrait de lier chaque client à l’import correspondant pour faciliter l’audit. + +- [ ] Ajouter un index composite sur (`email`, `birth_date`) si ces champs sont utilisés fréquemment pour des recherches combinées. + +- [ ] Ajouter des contraintes au niveau des champs : + - `String(length)` au lieu de `String` sans taille (par exemple `String(255)` pour `email`) + +- [ ] Dans `ImportJob` : + - Ajouter un champ `filename` (ou `source`) pour garder une trace du fichier CSV utilisé. + - Permettre un champ `error_log` (texte ou JSON) pour stocker les lignes rejetées ou les erreurs précises détectées. + +### Améliorations possibles des api/crud/base.py + +- [ ] Ajouter des méthodes `update` et `delete` pour compléter les opérations CRUD. + 👉 La classe actuelle respecte le périmètre demandé (Create et Read uniquement), + mais resterait incomplète dans un contexte CRUD général. + +### Améliorations possibles des schémas Pydantic + +- [ ] Ajouter des contraintes de validation supplémentaires dans `ClientSchema`, comme : + - `name`: longueur minimale/maximale (`min_length`, `max_length`) + - `birth_date`: validation personnalisée pour interdire les dates futures + +### Améliorations des schémas et validations + +- [ ] Ajouter un validateur pour interdire les dates de naissance futures. +- [ ] Vérifier que le champ `name` n'est pas vide ou uniquement composé d'espaces. +- [ ] Afficher des messages d'erreur personnalisés pour les emails invalides. +- [ ] Ajouter un validateur racine pour détecter des incohérences multi-champs. +- [ ] Ajouter des tests de validation sur des entrées clairement invalides. +- [ ] Uniformiser l’utilisation de `EmailStr` (utilisé dans `ClientSchema`, mais pas dans `ClientResponse`) + 👉 Cela permettrait de garder une validation forte même en sortie. + +### Améliorations de l’authentification/ Securité + +- [ ] Ajouter un provider de gestion des identifiants (`CredentialProvider`) pour centraliser la logique d’accès aux `client_id` et `client_secret` (ex. depuis un fichier, une base de données ou un gestionnaire de secrets). +- [ ] Gérer l’expiration des tokens côté client en ajoutant un mécanisme de **renouvellement périodique** via un endpoint `/refresh_token`. +- [ ] Implémenter un système de **permissions** et de **rôles** dans les tokens JWT (ex. `"role": "admin"`, `"scopes": ["read", "write"]`) pour une gestion plus fine de l’accès aux routes. +- [ ] Logger toutes les tentatives d’authentification, réussies ou échouées, avec le `client_id` et l’IP (sans journaliser les secrets). +- [ ] Ajouter des tests automatisés couvrant les cas : + - Absence du header `Authorization` + - Format invalide du token + - Expiration du token + - Tentative avec des credentials invalides + - Accès à une route protégée sans le bon rôle ou scope + +### Améliorations Securité Limitation de débit (Throttling) + +- [ ] Implémenter une limitation de débit globale sur les endpoints sensibles (ex: `/access_token`, `/import`) pour éviter les abus. +- [ ] Implémenter un système de throttling qui reconnaît le rôle de l’utilisateur ex. `"role": "admin"` à partir du token JWT. +- [ ] Retourner une erreur 429 (`Too Many Requests`) lorsque la limite est atteinte. +- [ ] Ajouter des en-têtes HTTP pour informer le client : + - `X-RateLimit-Limit`: nombre max autorisé + - `X-RateLimit-Remaining`: combien de requêtes restantes + - `Retry-After`: temps à attendre avant de retenter + +### Caching avec Redis + +- [ ] Mettre en place un cache Redis pour améliorer les performances des endpoints fréquents. +- [ ] Éviter de mettre en cache les données sensibles ou personnelles notamment les informations clients (`email`, `birth_date`), pour respecter la confidentialité et les règles de sécurité. +- [ ] Prévoir un mécanisme d’invalidation du cache en cas de modification des données sous-jacentes (signal). + + +# Notification email via signal après traitement CSV + +## Description +- [ ] Mettre en place un **signal** (ou un système d’événement) qui se déclenche une fois que la tâche d’import CSV est terminée. +- [ ] Ce signal appellera une fonction responsable d’envoyer un email de notification au(x) destinataire(s). + +## Détails +- [ ] Définir un signal `import_completed` +- [ ] Émettre ce signal à la fin de la fonction/tâche de traitement CSV, en passant un résumé du traitement. +- [ ] Connecter un handler au signal qui envoie l’email de notification. +- [ ] Cette approche découple le traitement CSV et la notification, facilitant la maintenance \ No newline at end of file diff --git a/README.md b/README.md index c42d618..b720d31 100644 --- a/README.md +++ b/README.md @@ -59,17 +59,169 @@ L’objectif est de démontrer votre maîtrise de Python, Celery, Redis et Postg - Gestion du cache avec Redis - Sécuriser les endpoints -## Exemple de scénario de test rapide -```bash -# Upload du fichier CSV -curl -F "file=@clients.csv" http://localhost:8000/api/imports -# → { "job_id": "abc123", "status": "PENDING" } +## Lancer le projet -# Vérifier le statut du traitement -curl http://localhost:8000/api/imports/abc123/status -# → { "job_id": "abc123", "status": "SUCCESS", "total": 50, "valid": 48, "errors": 2 } +Assurez-vous d'avoir Docker et Docker Compose installés. -# Récupérer la liste des clients -curl http://localhost:8000/api/clients?page=1&per_page=20 -# → { "clients": [ … ], "page":1, "total_pages":3 } \ No newline at end of file +### 1. Lancer les conteneurs + +```bash +docker compose up --build +docker compose up +``` + +### 2. Appliquer les migrations (création des tables) + +Les fichiers de migration sont déjà présents dans le projet (`migrations/` est versionné). Il suffit d'appliquer les migrations : + +```bash +docker compose exec app alembic upgrade head +``` + +--- + +## Tester l’API + +### 1. Authentification (client credentials) + +```bash +curl -X POST http://localhost:8000/auth/access_token \ + -H "Content-Type: application/x-www-form-urlencoded" \ + -d "grant_type=client_credentials&client_id=UezjwqiRG6cKJRtSdTBjrkLI09HpWPxDLdweOisr&client_secret=V5tSsR9VSWISVamUl8KweBiydAZ2SKV7Rs4NqZIeTQBB42IMrAGB1SRpoJpmIvEdcKQaGjrt6XLvRmwcUnqWV5CzO7ReHHHWS1x5GwxvxOI6tq87HpV8DlZZStGqZGUL" +``` + +### 2. Import de fichier CSV + +```bash +curl -F "file=@clients_utf8.csv" http://localhost:8000/api/imports \ + -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJVZXpqd3FpUkc2Y0tKUnRTZFRCanJrTEkwOUhwV1B4RExkd2VPaXNyIiwiZXhwIjoxNzc5NzEzNzE5LCJpYXQiOjE3NDgxNzc3MTl9.w5E64b78xLpjLEXuTt9CzYtJKlgomaQO_RrLi2_Edz8" + # → {"job_id":"e652107a-2be1-4d20-8846-1f426e3341aa","status":"pending"} +``` + +### 3. Liste des clients importés + +```bash +curl "http://localhost:8000/api/clients?page=1&per_page=20" \ + -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJVZXpqd3FpUkc2Y0tKUnRTZFRCanJrTEkwOUhwV1B4RExkd2VPaXNyIiwiZXhwIjoxNzc5NzEzNzE5LCJpYXQiOjE3NDgxNzc3MTl9.w5E64b78xLpjLEXuTt9CzYtJKlgomaQO_RrLi2_Edz8" + # → {"clients":[...],"page":1,"total_pages":3} +``` + +### 4. Suivi du statut d’un job d’import + +```bash +curl http://localhost:8000/api/imports/e559be95-fb8e-4114-b256-9ff081267f52/status \ + -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJVZXpqd3FpUkc2Y0tKUnRTZFRCanJrTEkwOUhwV1B4RExkd2VPaXNyIiwiZXhwIjoxNzc5NzEzNzE5LCJpYXQiOjE3NDgxNzc3MTl9.w5E64b78xLpjLEXuTt9CzYtJKlgomaQO_RrLi2_Edz8" + # → {"job_id":"e559be95-fb8e-4114-b256-9ff081267f52","status":"COMPLETED","total":10,"valid":10,"errors":0} +``` + +--- + +## Lancer les tests + +```bash +docker exec app_container env PYTHONPATH=/usr/src/app pytest -v tests + # → + tests/routers/test_routers.py::test_get_clients_success PASSED [ 6%] +tests/routers/test_routers.py::test_get_clients_page_2 PASSED [ 13%] +tests/routers/test_routers.py::test_get_clients_page_too_high PASSED [ 20%] +tests/routers/test_routers.py::test_get_clients_invalid_per_page PASSED [ 26%] +tests/routers/test_routers.py::test_get_clients_invalid_page PASSED [ 33%] +tests/routers/test_routers.py::test_get_clients_empty PASSED [ 40%] +tests/routers/test_routers.py::test_get_import_status_success PASSED [ 46%] +tests/routers/test_routers.py::test_get_import_status_in_progress PASSED [ 53%] +tests/routers/test_routers.py::test_get_import_status_not_found PASSED [ 60%] +tests/routers/test_routers.py::test_import_clients_success PASSED [ 66%] +tests/routers/test_routers.py::test_import_clients_invalid_file_save PASSED [ 73%] +tests/routers/test_routers.py::test_import_clients_dispatch_fails PASSED [ 80%] +tests/routers/test_routers.py::test_import_clients_missing_file PASSED [ 86%] +tests/test_tasks.py::test_run_import_job_success PASSED [ 93%] +tests/test_tasks.py::test_run_import_job_failure_retry PASSED [100%] +======================== 15 passed, 8 warnings in 2.52s ======================== +``` + +--- + +## Autres commandes utiles + +### Arrêter les conteneurs + +```bash +docker compose down +``` + +### Générer une nouvelle migration (si changement des modèles SQLAlchemy) + +```bash +docker compose exec app alembic revision --autogenerate -m "votre message ici" +docker compose exec app alembic upgrade head +``` + +--- + +## Permissions sous WSL2 / Linux + +Si vous rencontrez des erreurs de type `EACCES` (permission denied) : + +```bash +sudo chown -R $(whoami):$(whoami) migrations/ +chmod -R u+rw migrations/ +``` + +--- + +## Consulter la base de données (PostgreSQL) + +```bash +docker exec -it database_container psql -U user -d alpha +``` + +Dans `psql` : + +```sql +SELECT * FROM import_jobs; +SELECT * FROM clients; +``` + +--- + +## Structure du projet + +``` +csv-client-processor/ +│ +├── README.md +├── alembic.ini +├── docker-compose.yml +├── pyrightconfig.json +├── clients_utf8.csv +│ +└── api/ # Dossier principal de l'application FastAPI + ├── Dockerfile + ├── main.py # Point d'entrée de l’application FastAPI + ├── alembic.ini + ├── tasks.py # Tâches Celery + ├── crud/ # Fonctions de manipulation DB (Create, Read) + ├── database/ # Connexion et configuration de la base de données + ├── log_config/ # Configuration des logs (LOGGING_CONFIG) + ├── migrations/ # Dossier de migration Alembic + ├── models/ # Modèles SQLAlchemy + ├── routers/ # Fichiers de routing FastAPI (endpoints) + ├── schemas/ # Schémas Pydantic pour validation et sérialisation + ├── security/ # Authentification + ├── services/ # Logique métier (traitements CSV) + └── tests/ # Tests +``` + +--- + +## Notes + +- N'oubliez pas d’adapter les tokens JWT et les identifiants à vos propres valeurs. +- Les migrations étant versionnées, la première application (`alembic upgrade head`) suffit pour créer toutes les tables. +- Le fichier `.env` est inclus pour faciliter les tests locaux et garantir une exécution immédiate du projet en environnement de développement. + + +## Améliorations + +Pour consulter les pistes d'amélioration identifiées, voir le fichier [IMPROVEMENTS.md](IMPROVEMENTS.md). diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..ee77879 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,5 @@ +[alembic] +script_location = ./migrations +prepend_sys_path = . +version_path_separator = os +sqlalchemy.url = postgresql://%(POSTGRES_USER)s:%(POSTGRES_PASSWORD)s@%(POSTGRES_HOST)s:%(POSTGRES_PORT)s/%(POSTGRES_DB)s diff --git a/api/Dockerfile b/api/Dockerfile new file mode 100644 index 0000000..9c2bde4 --- /dev/null +++ b/api/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.13-slim + +WORKDIR /usr/src/app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["celery", "-A", "tasks", "worker", "--loglevel=info"] \ No newline at end of file diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/alembic.ini b/api/alembic.ini new file mode 100644 index 0000000..d7b1c14 --- /dev/null +++ b/api/alembic.ini @@ -0,0 +1,141 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts. +# this is typically a path given in POSIX (e.g. forward slashes) +# format, relative to the token %(here)s which refers to the location of this +# ini file +script_location = %(here)s/migrations + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. for multiple paths, the path separator +# is defined by "path_separator" below. +prepend_sys_path = . + + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library. +# Any required deps can installed by adding `alembic[tz]` to the pip requirements +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to /versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "path_separator" +# below. +# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions + +# path_separator; This indicates what character is used to split lists of file +# paths, including version_locations and prepend_sys_path within configparser +# files such as alembic.ini. +# The default rendered in new alembic.ini files is "os", which uses os.pathsep +# to provide os-dependent path splitting. +# +# Note that in order to support legacy alembic.ini files, this default does NOT +# take place if path_separator is not present in alembic.ini. If this +# option is omitted entirely, fallback logic is as follows: +# +# 1. Parsing of the version_locations option falls back to using the legacy +# "version_path_separator" key, which if absent then falls back to the legacy +# behavior of splitting on spaces and/or commas. +# 2. Parsing of the prepend_sys_path option falls back to the legacy +# behavior of splitting on spaces, commas, or colons. +# +# Valid values for path_separator are: +# +# path_separator = : +# path_separator = ; +# path_separator = space +# path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# database URL. This is consumed by the user-maintained env.py script only. +# other means of configuring database URLs may be customized within the env.py +# file. +sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the exec runner, execute a binary +# hooks = ruff +# ruff.type = exec +# ruff.executable = %(here)s/.venv/bin/ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Logging configuration. This is also consumed by the user-maintained +# env.py script only. +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/api/crud/__init__.py b/api/crud/__init__.py new file mode 100644 index 0000000..283c224 --- /dev/null +++ b/api/crud/__init__.py @@ -0,0 +1,7 @@ +from crud.clients import client_crud +from crud.import_jobs import import_job_crud + +__all__ = [ + "client_crud", + "import_job_crud", +] diff --git a/api/crud/base.py b/api/crud/base.py new file mode 100644 index 0000000..6de06b9 --- /dev/null +++ b/api/crud/base.py @@ -0,0 +1,73 @@ +from typing import List, Optional, Type + +from sqlalchemy.orm import Session + +from log_config.log import get_logger + +log = get_logger(__name__) + + +class CRRepository: + """ + Generic base class for basic Create and Read + operations on a SQLAlchemy model. + + This repository provides: + - `create`: Insert a new record into the database. + - `get_one`: Retrieve a single record by its ID. + - `get_many`: Retrieve multiple records with pagination support. + """ + + def __init__(self, model: Type): + self.model = model + + def get_one(self, db: Session, id: int) -> Optional[object]: + try: + record = db.query(self.model).get(id) + log.debug( + f"Retrieved {self.model.__name__} with ID={id}" + ) + return record + except Exception as e: + log.error( + f"Error retrieving {self.model.__name__} with " + f"ID={id}: {e}", + exc_info=True, + ) + + return None + + def get_many(self, db: Session, skip: int = 0, limit: int = 10) -> List: + try: + records = db.query(self.model).offset(skip).limit(limit).all() + log.debug( + f"Retrieved {len(records)} " + f" {self.model.__name__}(s)" + f"(skip={skip}, limit={limit})" + ) + return records + except Exception as e: + log.error( + f"Error retrieving many {self.model.__name__}: {e}", + exc_info=True, + ) + return [] + + def create(self, db: Session, obj_in) -> object: + try: + db_obj = self.model(**obj_in.dict()) + db.add(db_obj) + db.commit() + db.refresh(db_obj) + log.info( + f"Created new {self.model.__name__} " + f"with ID={db_obj.id}" + ) + return db_obj + except Exception as e: + db.rollback() + log.error( + f"Failed to create {self.model.__name__}: {e}", + exc_info=True + ) + raise diff --git a/api/crud/clients.py b/api/crud/clients.py new file mode 100644 index 0000000..91c3b9e --- /dev/null +++ b/api/crud/clients.py @@ -0,0 +1,19 @@ +from sqlalchemy.orm import Session + +from models import Client + +from .base import CRRepository + + +class CRRClient(CRRepository): + def __init__(self): + super().__init__(Client) + + def get_paginated(self, db: Session, skip: int, limit: int) -> dict: + query = db.query(self.model).order_by(self.model.id) + total = query.count() + items = query.offset(skip).limit(limit).all() + return {"total": total, "skip": skip, "limit": limit, "items": items} + + +client_crud = CRRClient() diff --git a/api/crud/import_jobs.py b/api/crud/import_jobs.py new file mode 100644 index 0000000..417e4ce --- /dev/null +++ b/api/crud/import_jobs.py @@ -0,0 +1,5 @@ +from models import ImportJob + +from .base import CRRepository + +import_job_crud = CRRepository(ImportJob) diff --git a/api/crud/main.py b/api/crud/main.py new file mode 100644 index 0000000..847b7fa --- /dev/null +++ b/api/crud/main.py @@ -0,0 +1,15 @@ +from fastapi import FastAPI + +from routers import clients_router, imports_router, status_router + +import logging.config +from log_config.logging_config import LOGGING_CONFIG + +logging.config.dictConfig(LOGGING_CONFIG) + + +app = FastAPI(title="CSV Ingestion", version="0.1.0") + +app.include_router(imports_router) +app.include_router(status_router, prefix="/api/imports", tags=["imports"]) +app.include_router(clients_router, prefix="/api/clients", tags=["clients"]) diff --git a/api/database/base_class.py b/api/database/base_class.py new file mode 100644 index 0000000..3fa8152 --- /dev/null +++ b/api/database/base_class.py @@ -0,0 +1,26 @@ +from typing import Any, Dict + +from sqlalchemy import MetaData +from sqlalchemy.orm import as_declarative + +POSTGRES_INDEXES_NAMING_CONVENTION = { + "ix": "%(column_0_label)s_idx", + "uq": "%(table_name)s_%(column_0_name)s_key", + "ck": "%(table_name)s_%(constraint_name)s_check", + "fk": "%(table_name)s_%(column_0_name)s_fkey", + "pk": "%(table_name)s_pkey", +} + + +metadata = MetaData(naming_convention=POSTGRES_INDEXES_NAMING_CONVENTION) + + +class_registry: Dict[str, Any] = {} + + +@as_declarative(class_registry=class_registry) +class Base: + id: Any + __name__: str + __abstract__: bool = True + metadata = metadata diff --git a/api/database/db.py b/api/database/db.py new file mode 100644 index 0000000..c6cbec0 --- /dev/null +++ b/api/database/db.py @@ -0,0 +1,29 @@ +from typing import Generator + +import os +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from log_config.log import get_logger + +POSTGRES_URL = os.getenv("POSTGRES_URL") + +engine = create_engine(POSTGRES_URL) +SessionLocal = sessionmaker(bind=engine) + +log = get_logger(__name__) + + +def db_context() -> Generator: + session = SessionLocal() + try: + yield session + except Exception as e: + log.error( + "An error occurred while getting the database session. Error: " + " %s", e + ) + raise + finally: + log.debug("closing database session") + session.close() diff --git a/api/log_config/log.py b/api/log_config/log.py new file mode 100644 index 0000000..fc79ad2 --- /dev/null +++ b/api/log_config/log.py @@ -0,0 +1,31 @@ +import logging +import sys +from typing import Optional + +LOGGING_FORMATTER = ( + "%(asctime)s [%(funcName)s] [%(filename)s:%(lineno)d]" + " %(levelname)-5s - %(message)s" +) + +DebugLevels = ["DEBUG", "INFO", "WARNING", "ERROR"] +DebugLevelType = str + + +def get_logger( + name: Optional[str] = None, level: DebugLevelType = "DEBUG" +) -> logging.Logger: + logger = logging.getLogger(name=name) + if not logger.hasHandlers(): + handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter(LOGGING_FORMATTER) + handler.setFormatter(formatter) + logger.addHandler(handler) + + if not level or level not in DebugLevels: + logger.warning( + "Invalid logging level %s. Setting logging level to DEBUG.", level + ) + level = "DEBUG" + + logger.setLevel(level=level) + return logger diff --git a/api/log_config/logging_config.py b/api/log_config/logging_config.py new file mode 100644 index 0000000..c4ad151 --- /dev/null +++ b/api/log_config/logging_config.py @@ -0,0 +1,47 @@ +LOGGING_FORMATTER = ( + "%(asctime)s [%(funcName)s] [%(filename)s:%(lineno)d]" + " %(levelname)-5s - %(message)s" +) + +LOGGING_CONFIG = { + "version": 1, + "disable_existing_loggers": False, + "formatters": { + "default": { + "format": LOGGING_FORMATTER, + }, + }, + "handlers": { + "default": { + "class": "logging.StreamHandler", + "formatter": "default", + "level": "DEBUG", + }, + }, + "root": { + "handlers": ["default"], + "level": "DEBUG", + }, + "loggers": { + "uvicorn": { + "handlers": ["default"], + "level": "INFO", + "propagate": False, + }, + "uvicorn.error": { + "handlers": ["default"], + "level": "INFO", + "propagate": False, + }, + "uvicorn.access": { + "handlers": ["default"], + "level": "INFO", + "propagate": False, + }, + "app": { + "handlers": ["default"], + "level": "DEBUG", + "propagate": False, + }, + }, +} diff --git a/api/main.py b/api/main.py new file mode 100644 index 0000000..dd9973e --- /dev/null +++ b/api/main.py @@ -0,0 +1,17 @@ +from fastapi import FastAPI + +from routers import clients_router, imports_router, status_router +from security.auth.routers import router as auth_router + +import logging.config +from log_config.logging_config import LOGGING_CONFIG + +logging.config.dictConfig(LOGGING_CONFIG) + + +app = FastAPI(title="Client Data Ingestor", version="0.1.0") + +app.include_router(imports_router) +app.include_router(status_router, prefix="/api/imports", tags=["imports"]) +app.include_router(clients_router, prefix="/api/clients", tags=["clients"]) +app.include_router(auth_router, prefix="/auth") diff --git a/api/migrations/README b/api/migrations/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/api/migrations/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/api/migrations/env.py b/api/migrations/env.py new file mode 100644 index 0000000..bf02400 --- /dev/null +++ b/api/migrations/env.py @@ -0,0 +1,56 @@ +import os +from logging.config import fileConfig + +from sqlalchemy import engine_from_config, pool +from alembic import context +from database.base_class import Base +import models + +config = context.config + +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +target_metadata = Base.metadata + +POSTGRES_URL = ( + "postgresql+psycopg://" + f"{os.getenv('POSTGRES_USER')}:{os.getenv('POSTGRES_PASSWORD')}@" + f"{os.getenv('POSTGRES_HOST')}:{os.getenv('POSTGRES_PORT')}/" + f"{os.getenv('POSTGRES_DB')}" +) + + +config.set_main_option("sqlalchemy.url", POSTGRES_URL) + + +def run_migrations_offline(): + context.configure( + url=POSTGRES_URL, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + connectable = engine_from_config( + config.get_section(config.config_ini_section), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + with connectable.connect() as connection: + context.configure( + connection=connection, + target_metadata=target_metadata, + ) + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/api/migrations/script.py.mako b/api/migrations/script.py.mako new file mode 100644 index 0000000..480b130 --- /dev/null +++ b/api/migrations/script.py.mako @@ -0,0 +1,28 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + """Upgrade schema.""" + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + """Downgrade schema.""" + ${downgrades if downgrades else "pass"} diff --git a/api/migrations/versions/4fb49af6985b_init.py b/api/migrations/versions/4fb49af6985b_init.py new file mode 100644 index 0000000..f268e40 --- /dev/null +++ b/api/migrations/versions/4fb49af6985b_init.py @@ -0,0 +1,51 @@ +"""init + +Revision ID: 4fb49af6985b +Revises: +Create Date: 2025-05-25 10:01:36.567292 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '4fb49af6985b' +down_revision: Union[str, None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('clients', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('name', sa.String(), nullable=False), + sa.Column('email', sa.String(), nullable=False), + sa.Column('birth_date', sa.Date(), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_clients_birth_date'), 'clients', ['birth_date'], unique=False) + op.create_index(op.f('ix_clients_email'), 'clients', ['email'], unique=False) + op.create_table('import_jobs', + sa.Column('id', sa.UUID(), nullable=False), + sa.Column('status', sa.Enum('pending', 'in_progress', 'completed', 'failed', name='jobstatus'), nullable=False), + sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True), + sa.Column('updated_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True), + sa.Column('row_stats', sa.JSON(), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade schema.""" + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('import_jobs') + op.drop_index(op.f('ix_clients_email'), table_name='clients') + op.drop_index(op.f('ix_clients_birth_date'), table_name='clients') + op.drop_table('clients') + # ### end Alembic commands ### diff --git a/api/models/__init__.py b/api/models/__init__.py new file mode 100644 index 0000000..07daa39 --- /dev/null +++ b/api/models/__init__.py @@ -0,0 +1,8 @@ +from models.client import Client +from models.import_job import ImportJob, JobStatus + +__all__ = [ + "Client", + "ImportJob", + "JobStatus" +] diff --git a/api/models/client.py b/api/models/client.py new file mode 100644 index 0000000..3c382fb --- /dev/null +++ b/api/models/client.py @@ -0,0 +1,11 @@ +from sqlalchemy import Column, Integer, String, Date +from database.base_class import Base + + +class Client(Base): + __tablename__ = "clients" + + id = Column(Integer, primary_key=True, autoincrement=True) + name = Column(String, nullable=False) + email = Column(String, index=True, nullable=False) + birth_date = Column(Date, index=True, nullable=False) diff --git a/api/models/import_job.py b/api/models/import_job.py new file mode 100644 index 0000000..e49482d --- /dev/null +++ b/api/models/import_job.py @@ -0,0 +1,29 @@ +import enum +import uuid + +from sqlalchemy import Column, DateTime, Enum, JSON, func +from sqlalchemy.dialects.postgresql import UUID + +from database.base_class import Base + + +class JobStatus(enum.Enum): + pending = "pending" + in_progress = "in_progress" + completed = "completed" + failed = "failed" + + +class ImportJob(Base): + __tablename__ = "import_jobs" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + status = Column(Enum(JobStatus), nullable=False, default=JobStatus.pending) + created_at = Column(DateTime(timezone=True), server_default=func.now()) + updated_at = Column( + DateTime(timezone=True), server_default=func.now(), onupdate=func.now() + ) + row_stats = Column( + JSON, nullable=False, default=lambda: + {"total": 0, "valid": 0, "errors": 0} + ) diff --git a/api/requirements.txt b/api/requirements.txt new file mode 100644 index 0000000..95a1707 --- /dev/null +++ b/api/requirements.txt @@ -0,0 +1,21 @@ +celery==5.3.1 +fastapi>=0.95 +pandas +psycopg[binary] +redis==4.6.0 +requests==2.31.0 +SQLAlchemy>=2.0,<2.1 +uvicorn==0.22.0 +importlib-metadata==6.0.0 +python-multipart +pydantic[email] + +alembic>=1.10,<2.0 + +pytest +starlette>=0.26 +httpx>=0.23 +pytest-cov +coverage + +pyjwt diff --git a/api/routers/__init__.py b/api/routers/__init__.py new file mode 100644 index 0000000..c530494 --- /dev/null +++ b/api/routers/__init__.py @@ -0,0 +1,10 @@ +from routers.clients import router as clients_router +from routers.imports import router as imports_router +from routers.status import router as status_router + + +__all__ = [ + "clients_router", + "imports_router", + "status_router", +] diff --git a/api/routers/clients.py b/api/routers/clients.py new file mode 100644 index 0000000..e43a8c9 --- /dev/null +++ b/api/routers/clients.py @@ -0,0 +1,58 @@ +from fastapi import APIRouter, Depends, Query, status, HTTPException +from sqlalchemy.orm import Session + +from crud import client_crud +from database.db import db_context +from log_config.log import get_logger +from schemas.clients import ClientResponse, PaginatedClientsResponse +from security.auth.token_utils import verify_token +router = APIRouter() + +log = get_logger(__name__) + + +@router.get( + "", + response_model=PaginatedClientsResponse, + status_code=status.HTTP_200_OK +) +def list_clients( + page: int = Query(1, ge=1), + per_page: int = Query(20, ge=1), + db: Session = Depends(db_context), + payload: dict = Depends(verify_token) +): + log.info( + "Received request to list clients (page=%d, per_page=%d)", + page, per_page + ) + + try: + pagination_result = client_crud.get_paginated( + db, skip=(page - 1) * per_page, limit=per_page) + + if not pagination_result or "items" not in pagination_result: + log.error("Invalid pagination result from client_crud") + raise HTTPException(status_code=500, + detail="Failed to retrieve clients") + + clients = pagination_result["items"] + total_clients = pagination_result["total"] + total_pages = (total_clients + per_page - 1) // per_page + + log.debug( + "Total clients found: %d, returning page %d/%d", + total_clients, + page, + total_pages, + ) + + return PaginatedClientsResponse( + clients=[ClientResponse.from_orm(client) for client in clients], + page=page, + total_pages=total_pages, + ) + + except Exception as e: + log.error("Failed to list clients: %s", str(e), exc_info=True) + raise HTTPException(status_code=500, detail="Internal server error") diff --git a/api/routers/imports.py b/api/routers/imports.py new file mode 100644 index 0000000..77f5609 --- /dev/null +++ b/api/routers/imports.py @@ -0,0 +1,65 @@ +import os +import shutil + +from fastapi import APIRouter, Depends, File, UploadFile, HTTPException, status +from sqlalchemy.orm import Session + +from database.db import db_context +from models import ImportJob, JobStatus +from log_config.log import get_logger +from schemas.imports import ImportJobResponse +from tasks import run_import_job +from security.auth.token_utils import verify_token + +router = APIRouter(prefix="/api/imports", tags=["imports"]) + +log = get_logger(__name__) + + +@router.post("", response_model=ImportJobResponse) +def import_clients( + file: UploadFile = File(...), + db: Session = Depends(db_context), + _: dict = Depends(verify_token) +): + shared_dir = "/shared_data" + try: + os.makedirs(shared_dir, exist_ok=True) + temp_path = os.path.join(shared_dir, file.filename) + log.info("Saving uploaded file to: %s", temp_path) + with open(temp_path, "wb") as buffer: + shutil.copyfileobj(file.file, buffer) + except Exception as e: + log.error("Failed to save uploaded file: %s", e, exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to save uploaded file.", + ) + + try: + job = ImportJob(status=JobStatus.pending) # type: ignore + db.add(job) + db.commit() + db.refresh(job) + log.info("Created import job with ID %s", job.id) + except Exception as e: + db.rollback() + log.error("Failed to create import job: %s", e, exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to create import job.", + ) + + try: + run_import_job.delay(job.id, temp_path) + log.info("Dispatched import job %s with file %s", job.id, temp_path) + except Exception as e: + log.error("Failed to dispatch import job %s: %s", job.id, e, + exc_info=True + ) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to start import job.", + ) + + return ImportJobResponse(job_id=job.id, status=job.status.value) diff --git a/api/routers/status.py b/api/routers/status.py new file mode 100644 index 0000000..cb126c7 --- /dev/null +++ b/api/routers/status.py @@ -0,0 +1,38 @@ +from uuid import UUID +from fastapi import APIRouter, Depends, HTTPException, status + +from crud import import_job_crud +from database.db import db_context +from log_config.log import get_logger +from schemas.status import ImportJobStatusResponse +from security.auth.token_utils import verify_token + +log = get_logger(__name__) + + +router = APIRouter() + + +@router.get( + "/{job_id}/status", + response_model=ImportJobStatusResponse, + status_code=status.HTTP_200_OK, +) +def get_import_status( + job_id: UUID, + db=Depends(db_context), + _: dict = Depends(verify_token), +): + job = import_job_crud.get_one(db, job_id) # type: ignore[call-arg] + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + row_stats = job.row_stats or {"total": 0, "valid": 0, "errors": 0} # type: ignore[call-arg] + + return ImportJobStatusResponse( + job_id=job.id, # type: ignore[call-arg] + status=job.status.name.upper(), # type: ignore[call-arg] + total=row_stats.get("total", 0), + valid=row_stats.get("valid", 0), + errors=row_stats.get("errors", 0), + ) diff --git a/api/schemas/clients.py b/api/schemas/clients.py new file mode 100644 index 0000000..9b5f605 --- /dev/null +++ b/api/schemas/clients.py @@ -0,0 +1,26 @@ +from datetime import date +from typing import List, Optional + +from pydantic import BaseModel, EmailStr + + +class ClientSchema(BaseModel): + name: str + email: EmailStr + birth_date: Optional[date] + + +class ClientResponse(BaseModel): + id: int + name: str + email: str + birth_date: date + + class Config: + from_attributes = True + + +class PaginatedClientsResponse(BaseModel): + clients: List[ClientResponse] + page: int + total_pages: int diff --git a/api/schemas/imports.py b/api/schemas/imports.py new file mode 100644 index 0000000..dedc2e3 --- /dev/null +++ b/api/schemas/imports.py @@ -0,0 +1,10 @@ +from uuid import UUID +from pydantic import BaseModel + + +class ImportJobResponse(BaseModel): + job_id: UUID + status: str + + class Config: + from_attributes = True diff --git a/api/schemas/status.py b/api/schemas/status.py new file mode 100644 index 0000000..e91b94d --- /dev/null +++ b/api/schemas/status.py @@ -0,0 +1,12 @@ +from uuid import UUID +from typing import Literal + +from pydantic import BaseModel + + +class ImportJobStatusResponse(BaseModel): + job_id: UUID + status: Literal["PENDING", "IN_PROGRESS", "COMPLETED", "FAILED"] + total: int + valid: int + errors: int diff --git a/api/security/auth/routers.py b/api/security/auth/routers.py new file mode 100644 index 0000000..885bd57 --- /dev/null +++ b/api/security/auth/routers.py @@ -0,0 +1,32 @@ +import os +from fastapi import APIRouter, Form, HTTPException +from fastapi.responses import JSONResponse +from security.auth.token_utils import ( + create_access_token, + validate_client_credentials + ) +from log_config.log import get_logger + +router = APIRouter() +log = get_logger(__name__) + + +@router.post("/access_token") +def access_token( + grant_type: str = Form(...), + client_id: str = Form(...), + client_secret: str = Form(...) +): + if grant_type != "client_credentials": + raise HTTPException(status_code=400, detail="Unsupported grant_type") + if not validate_client_credentials(client_id, client_secret): + raise HTTPException(status_code=401, + detail="Invalid client credentials") + + token = create_access_token(client_id) + log.info(f"Token generated for client_id={client_id}") + return JSONResponse({ + "access_token": token, + "token_type": "bearer", + "expires_in": int(os.getenv("ACCESS_TOKEN_EXPIRE_SECONDS", 3600)) + }) diff --git a/api/security/auth/token_utils.py b/api/security/auth/token_utils.py new file mode 100644 index 0000000..efba1e6 --- /dev/null +++ b/api/security/auth/token_utils.py @@ -0,0 +1,47 @@ +import os +from datetime import datetime, timedelta +import jwt +from fastapi import HTTPException, Request + +VALID_CLIENT_ID = os.getenv("VALID_CLIENT_ID") +VALID_CLIENT_SECRET = os.getenv("VALID_CLIENT_SECRET") +JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY") +JWT_ALGORITHM = os.getenv("JWT_ALGORITHM", "HS256") +ACCESS_TOKEN_EXPIRE_SECONDS = int(os.getenv("ACCESS_TOKEN_EXPIRE_SECONDS", 3600)) + + +def create_access_token(client_id: str) -> str: + expire = datetime.utcnow() + timedelta(seconds=ACCESS_TOKEN_EXPIRE_SECONDS) + payload = { + "sub": client_id, + "exp": expire, + "iat": datetime.utcnow(), + } + + token = jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM) + return token.decode("utf-8") if isinstance(token, bytes) else token + + +def verify_token(request: Request): + auth_header = request.headers.get("Authorization") + + if not auth_header or not auth_header.startswith("Bearer "): + raise HTTPException(status_code=401, + detail="Missing or invalid Authorization header") + + token = auth_header.split(" ")[1] + + try: + payload = jwt.decode(token, JWT_SECRET_KEY, algorithms=[JWT_ALGORITHM]) + return payload + except jwt.ExpiredSignatureError: + raise HTTPException(status_code=401, detail="Token expired") + except jwt.InvalidTokenError: + raise HTTPException(status_code=401, detail="Invalid token") + + +def validate_client_credentials(client_id: str, client_secret: str) -> bool: + return ( + client_id == VALID_CLIENT_ID + and client_secret == VALID_CLIENT_SECRET + ) diff --git a/api/services/import_service.py b/api/services/import_service.py new file mode 100644 index 0000000..cb51d8a --- /dev/null +++ b/api/services/import_service.py @@ -0,0 +1,60 @@ +from datetime import datetime + +import pandas as pd +from pydantic import ValidationError +from sqlalchemy.orm import Session + +from log_config.log import get_logger +from models import Client +from models import ImportJob, JobStatus +from schemas.clients import ClientSchema + +log = get_logger(__name__) + + +def parse_date(date_str): + try: + if not date_str or pd.isna(date_str): + return None + return datetime.strptime(date_str, "%Y-%m-%d").date() + except Exception as e: + log.warning("Failed to parse date '%s': %s", date_str, e) + return None + + +def process_csv(filepath: str, job_id: int, db: Session): + job = db.query(ImportJob).get(job_id) + job.status = JobStatus.in_progress + db.commit() + + try: + df = pd.read_csv(filepath, skipinitialspace=True) + except Exception as e: + log.error("Failed to read CSV file '%s': %s", filepath, e) + job.status = JobStatus.failed + db.commit() + return + + total, valid, errors = 0, 0, 0 + + for i, row in df.iterrows(): + total += 1 + try: + client = ClientSchema( + name=row["nom"], + email=row["email"], + birth_date=parse_date(row["date de naissance"]), + ) + db.add(Client(**client.dict())) + valid += 1 + except (ValidationError, ValueError) as e: + errors += 1 + log.warning("Validation error at row %d: %s", i, e) + + job.row_stats = {"total": total, "valid": valid, "errors": errors} + job.status = ( + JobStatus.completed + if errors == 0 + else JobStatus.failed if valid == 0 else JobStatus.completed + ) + db.commit() diff --git a/api/tasks.py b/api/tasks.py new file mode 100644 index 0000000..540242f --- /dev/null +++ b/api/tasks.py @@ -0,0 +1,42 @@ +import os +import logging.config + +from celery import Celery +from sqlalchemy.exc import SQLAlchemyError + +from database.db import SessionLocal +from log_config.logging_config import LOGGING_CONFIG +from log_config.log import get_logger +from services.import_service import process_csv + +logging.config.dictConfig(LOGGING_CONFIG) + +log = get_logger(__name__) + + +BROKER_URL = os.getenv("CELERY_BROKER_URL") +RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND") + +app = Celery("tasks", broker=BROKER_URL, backend=RESULT_BACKEND) + + +@app.task( + bind=True, + autoretry_for=(SQLAlchemyError, IOError, Exception), + retry_kwargs={"max_retries": 3, "countdown": 10}, + retry_backoff=True, +) +def run_import_job(self, job_id: int, filepath: str) -> None: + log.info("Starting import job %d with file: %s", job_id, filepath) + + try: + with SessionLocal() as db: + process_csv(filepath, job_id, db) + log.info("Import job %d completed successfully.", job_id) + + except Exception as e: + log.error( + "Import job %d failed. Error: %s", job_id, str(e), + exc_info=True + ) + raise self.retry(exc=e) diff --git a/api/tests/__init__.py b/api/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/tests/routers/__init__.py b/api/tests/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/tests/routers/test_routers.py b/api/tests/routers/test_routers.py new file mode 100644 index 0000000..be53574 --- /dev/null +++ b/api/tests/routers/test_routers.py @@ -0,0 +1,251 @@ +import io +import pathlib +import shutil +from datetime import date +from unittest.mock import patch +from uuid import uuid4 + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from types import SimpleNamespace + +from main import app +from database.base_class import Base +from database.db import db_context +from models import Client, ImportJob, JobStatus +from security.auth.token_utils import verify_token + + +# --- Mock authentication --- +def fake_get_current_user(): + return SimpleNamespace(id=1, username="testuser") + + +app.dependency_overrides[verify_token] = fake_get_current_user + + +# --- In-memory DB configuration for tests --- +SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:" + +engine = create_engine( + SQLALCHEMY_DATABASE_URL, + connect_args={"check_same_thread": False} +) +connection = engine.connect() +Base.metadata.create_all(bind=connection) + +TestingSessionLocal = sessionmaker( + bind=connection, + autocommit=False, + autoflush=False, +) + + +def override_db_context(): + db = TestingSessionLocal() + try: + yield db + finally: + db.close() + + +app.dependency_overrides[db_context] = override_db_context +client = TestClient(app) + + +@pytest.fixture(scope="session", autouse=True) +def setup_database(): + yield + Base.metadata.drop_all(bind=connection) + connection.close() + + +@pytest.fixture(scope="function") +def seed_clients(): + db = TestingSessionLocal() + clients = [ + Client( + name="Alice", # type: ignore[call-arg] + email="alice@example.com", # type: ignore[call-arg] + birth_date=date(1990, 1, 1), # type: ignore[call-arg] + ), + Client( + name="Bob", # type: ignore[call-arg] + email="bob@example.com", # type: ignore[call-arg] + birth_date=date(1985, 5, 20), # type: ignore[call-arg] + ), + Client( + name="Charlie", # type: ignore[call-arg] + email="charlie@example.com", # type: ignore[call-arg] + birth_date=date(2000, 12, 31), # type: ignore[call-arg] + ), + ] + db.add_all(clients) + db.commit() + yield + db.query(Client).delete() + db.commit() + db.close() + + +@pytest.fixture(scope="function") +def seed_import_jobs(): + db = TestingSessionLocal() + job1 = ImportJob( + id=uuid4(), # type: ignore[call-arg] + status=JobStatus.completed, # type: ignore[call-arg] + row_stats={"total": 100, "valid": 90, "errors": 10}, # type: ignore[call-arg] + ) + job2 = ImportJob( + id=uuid4(), # type: ignore[call-arg] + status=JobStatus.in_progress, # type: ignore[call-arg] + row_stats=None, # type: ignore[call-arg] + ) + db.add_all([job1, job2]) + db.commit() + yield job1, job2 + db.query(ImportJob).delete() + db.commit() + db.close() + + +@pytest.fixture(autouse=True) +def cleanup_shared_dir(): + shared_dir = pathlib.Path("/shared_data") + yield + if shared_dir.exists(): + for f in shared_dir.iterdir(): + try: + if f.is_file(): + f.unlink() + elif f.is_dir(): + shutil.rmtree(f) + except Exception as e: + print(f"Failed to clean up file: {f} - {e}") + + +# --- Tests Clients --- + +def test_get_clients_success(seed_clients): + response = client.get("/api/clients?page=1&per_page=2") + assert response.status_code == 200 + data = response.json() + assert data["page"] == 1 + assert data["total_pages"] == 2 + assert len(data["clients"]) == 2 + + +def test_get_clients_page_2(seed_clients): + response = client.get("/api/clients?page=2&per_page=2") + assert response.status_code == 200 + data = response.json() + assert data["page"] == 2 + assert len(data["clients"]) == 1 + + +def test_get_clients_page_too_high(): + response = client.get("/api/clients?page=5&per_page=2") + assert response.status_code == 200 + data = response.json() + assert data["page"] == 5 + assert data["clients"] == [] + + +def test_get_clients_invalid_per_page(): + response = client.get("/api/clients?page=1&per_page=0") + assert response.status_code == 422 + + +def test_get_clients_invalid_page(): + response = client.get("/api/clients?page=-1&per_page=2") + assert response.status_code == 422 + + +def test_get_clients_empty(): + db = TestingSessionLocal() + db.query(Client).delete() + db.commit() + db.close() + response = client.get("/api/clients?page=1&per_page=10") + assert response.status_code == 200 + data = response.json() + assert data["clients"] == [] + assert data["page"] == 1 + assert data["total_pages"] == 0 + + +# --- Tests ImportJob --- + +def test_get_import_status_success(seed_import_jobs): + job1, _ = seed_import_jobs + response = client.get(f"/api/imports/{job1.id}/status") + assert response.status_code == 200 + data = response.json() + assert data["job_id"] == str(job1.id) + assert data["status"] == job1.status.name.upper() + assert data["total"] == 100 + assert data["valid"] == 90 + assert data["errors"] == 10 + + +def test_get_import_status_in_progress(seed_import_jobs): + _, job2 = seed_import_jobs + response = client.get(f"/api/imports/{job2.id}/status") + assert response.status_code == 200 + data = response.json() + assert data["job_id"] == str(job2.id) + assert data["status"] == job2.status.name.upper() + assert data["total"] == 0 + assert data["valid"] == 0 + assert data["errors"] == 0 + + +def test_get_import_status_not_found(): + fake_uuid = uuid4() + response = client.get(f"/api/imports/{fake_uuid}/status") + assert response.status_code == 404 + data = response.json() + assert data["detail"] == "Import job not found" + + +# --- Tests Imports POST --- + +@patch("routers.imports.run_import_job.delay") +def test_import_clients_success(mock_delay): + content = b"name,email,birth_date\nAlice,alice@example.com,1990-01-01" + file = {"file": ("clients.csv", io.BytesIO(content), "text/csv")} + response = client.post("/api/imports", files=file) + assert response.status_code == 200 + data = response.json() + assert "job_id" in data + assert data["status"] == "pending" + mock_delay.assert_called_once() + + +@patch("routers.imports.run_import_job.delay") +def test_import_clients_invalid_file_save(mock_delay, monkeypatch): + monkeypatch.setattr("os.makedirs", lambda *args, **kwargs: (_ for _ in ()).throw(OSError("fail"))) + content = b"test" + file = {"file": ("fail.csv", io.BytesIO(content), "text/csv")} + response = client.post("/api/imports", files=file) + + assert response.status_code == 500 + assert response.json()["detail"] == "Failed to save uploaded file." + + +@patch("routers.imports.run_import_job.delay", + side_effect=Exception("Dispatch failed") + ) +def test_import_clients_dispatch_fails(mock_delay): + content = b"name,email,birth_date\nAlice,alice@example.com,1990-01-01" + file = {"file": ("clients.csv", io.BytesIO(content), "text/csv")} + response = client.post("/api/imports", files=file) + assert response.status_code == 500 + assert response.json()["detail"] == "Failed to start import job." + + +def test_import_clients_missing_file(): + response = client.post("/api/imports", files={}) + assert response.status_code == 422 diff --git a/api/tests/test_tasks.py b/api/tests/test_tasks.py new file mode 100644 index 0000000..48c9782 --- /dev/null +++ b/api/tests/test_tasks.py @@ -0,0 +1,43 @@ +import pytest +from celery.exceptions import Retry +from unittest.mock import patch, MagicMock +from tasks import run_import_job + + +@pytest.fixture +def mock_session_local(): + with patch("tasks.SessionLocal") as mock: + mock_instance = MagicMock() + mock.return_value.__enter__.return_value = mock_instance + yield mock_instance + + +@pytest.fixture +def mock_process_csv(): + with patch("tasks.process_csv") as mock: + yield mock + + +@pytest.fixture +def mock_retry(): + with patch.object(run_import_job, "retry") as mock: + mock.side_effect = Retry("retry called") + yield mock + + +def test_run_import_job_success(mock_session_local, mock_process_csv): + run_import_job.run(1, "fakefile.csv") + mock_process_csv.assert_called_once_with( + "fakefile.csv", 1, mock_session_local + ) + + +def test_run_import_job_failure_retry( + mock_session_local, + mock_process_csv, + mock_retry, +): + mock_process_csv.side_effect = Exception("fail") + with pytest.raises(Retry): + run_import_job.run(1, "fakefile.csv") + mock_retry.assert_called_once() diff --git a/clients_utf8.csv b/clients_utf8.csv new file mode 100644 index 0000000..f26845b --- /dev/null +++ b/clients_utf8.csv @@ -0,0 +1,11 @@ +nom,email,date de naissance +Alice Dupont,alice.dupont@example.com,1990-04-12 +Bob Martin,bob.martin@example.org,1985-06-23 +Claire Leroy,claire.leroy@example.net,1992-11-15 +David Moreau,david.moreau@example.com,1988-09-08 +Emma Caron,emma.caron@example.org,1995-02-02 +Fabien Noel,fabien.noel@example.net,1983-07-19 +Gwen Roussel,gwen.roussel@example.com,1991-05-30 +Hugo Lopez,hugo.lopez@example.org,1987-03-14 +Isabelle Fabre,isabelle.fabre@example.net,1994-10-01 +Julien Mercier,julien.mercier@example.com,1989-12-25 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..8d5ae84 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,55 @@ +services: + app: + container_name: app_container + build: ./api + ports: + - "8000:8000" + command: uvicorn main:app --host 0.0.0.0 --reload + volumes: + - ./api:/usr/src/app + - shared_data:/shared_data + environment: + - CELERY_BROKER_URL=${CELERY_BROKER_URL} + - CELERY_RESULT_BACKEND=${CELERY_RESULT_BACKEND} + depends_on: + - redis + - database + env_file: + - .env + + worker: + container_name: worker_container + build: ./api + command: celery -A tasks worker + volumes: + - ./api:/usr/src/app + - shared_data:/shared_data + environment: + - CELERY_BROKER_URL=${CELERY_BROKER_URL} + - CELERY_RESULT_BACKEND=${CELERY_RESULT_BACKEND} + depends_on: + - app + - redis + - database + env_file: + - .env + + redis: + container_name: redis_container + image: redis:latest + + database: + container_name: database_container + image: postgres:latest + volumes: + - postgres_data:/var/lib/postgresql/data/ + environment: + - POSTGRES_USER=${POSTGRES_USER} + - POSTGRES_PASSWORD=${POSTGRES_PASSWORD} + - POSTGRES_DB=${POSTGRES_DB} + ports: + - "5432:5432" + +volumes: + postgres_data: + shared_data: diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 0000000..2494728 --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,8 @@ +{ + "typeCheckingMode": "basic", + "reportMissingImports": false, + "reportMissingTypeStubs": false, + "include": [ + "api" + ] +} \ No newline at end of file