On part des modèles
Ensuite on fait les DTO et les Forms
Après on fait les services avec leur fonctions
Et en dernier le controller
!!! En cas de modification du modèle !!!
Toujours faire une migration
-> flask db migrate -m "message"
-> flask db upgrade
Les modèles sont uniquement utilisé pour communiqué avec la DB et modifier les informations de la DB.
from app import db
class User(db.Model):
__tablename__ = 'users'
userid = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(255), unique=True, nullable=False)
password = db.Column(db.String(255), nullable=False)
lastname = db.Column(db.String(255), nullable=True)
firstname = db.Column(db.String(255), nullable=True)Vont récupérer les données envoyées par l'utilisateur au travers des body de la requête. Et les forms vont aussi faire la validation de ces données, donc vérifier que les contraintes d'intégrité sont bien respectées.
from flask_wtf import FlaskForm
from wtforms import StringField
from wtforms.validators import DataRequired
class UserRegisterForm(FlaskForm):
class Meta:
csrf = False
username = StringField('username', validators=[DataRequired()])
password = StringField('password', validators=[DataRequired()])Les DTOs sont les objets qui contiendront les données envoyée à l'utilisateur. Ceux-ci peuvent être utilisé entre autre pour filtrer des données ne devant pas être renvoyée (example le password de l'utilisateur). Ils peuvent contenir des données supplémentaires (example le nom complèt d'une personne)
from copy import deepcopy
from app.models.user import User
class UserDTO:
def __init__(self, user: User):
self.id = user.userid
self.username = user.username
self.firstname = user.firstname
self.lastname = user.lastname
self.fullname = f"{user.firstname} {user.lastname}" if user.firstname and user.lastname else None
def serialize(self):
dto = deepcopy(self)
return dto.__dict__Les controlleurs vont habituellement avoir au minimum les routes suivantes :
GET ALL /nom_de_l_entite : récupère toutes les entités du type en DB
@app.get('/users')
def get_users():
user_service = UserService()
return jsonify([u.serialize() for u in user_service.find_all()])GET /nom_entitee/ID => récupère une entité par son ID
@app.get('/users/<int:userid>')
def get_user(userid: int):
user_service = UserService()
user = user_service.find_one(userid)
return jsonify(user.serialize()) if user else ('', 404)POST /nom_entite => ajoute une nouvelle entité en DB
@app.post('/users')
def post_user():
user_service = UserService()
form = UserRegisterForm.from_json(request.json)
if form.validate():
user = user_service.insert(form)
return jsonify(user.serialize())
return jsonify(form.errors)PUT /nom_entite/ID => permet de modifier une entité en DB
@app.put('/users/<int:userid>')
def put_user(userid: int):
user_service = UserService()
form = UserUpdateForm.from_json(request.json)
if form.validate():
user = user_service.update(userid, form)
return jsonify(user.serialize())
return jsonify(form.errors)DELETE /nom_entite/ID : Servira pour supprimer une entité par ID
@app.delete('/users/<int:userid>')
def delete_user(userid: int):
user_service = UserService()
user = user_service.find_one(userid)
if user:
user_service.delete(user)
return ('', 204)
return ('', 404)L’API renvoie un code standard dans chaque réponse pour indiquer l’état du traitement :
| Code | Signification | Description |
|---|---|---|
| 200 | OK | La requête a été traitée avec succès. |
| 201 | Created | Une nouvelle ressource a été créée. |
| 400 | Bad Request | La requête est invalide ou contient des paramètres incorrects. |
| 401 | Unauthorized | Authentification requise ou jeton invalide. |
| 403 | Forbidden | L’accès à la ressource demandée est refusé. |
| 404 | Not Found | La ressource demandée n’existe pas. |
| 409 | Conflict | Conflit avec l’état actuel de la ressource (ex. doublon). |
| 422 | Unprocessable Entity | Données valides mais impossibles à traiter. |
| 500 | Internal Server Error | Une erreur interne est survenue sur le serveur. |
| 503 | Service Unavailable | Le service est temporairement indisponible (ex. maintenance). |
Les modès sont créé via SQLAlchemy et les migrations sont gérée par Alembic
Après avoir modifier ou créé une entité il faut absolument générer une migration et l'exécuter!
=> Fichiers Models/user.py Models/Basket.py
Ici on va déclarer des variables static qui vont définir les différents parametres de notre table :
tablename => nom de la table
pour les champs : nomDuChamp: Mapped[TYPE] = mapped_column(?contraintes?)
Exemple :
id: Mapped[int] = mapped_column(primary_key=True)
Exemple créer un champ username d'une longueur max de 50 et avec le flag unique :
username: Mapped[String] = mapped_column(String(50), unique=True)
from sqlalchemy.orm import Mapped
from sqlalchemy.testing.schema import mapped_column
from Models.Base import Base
class User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(unique=True)
password: Mapped[str] = mapped_column()from sqlalchemy import Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import relationship
# On définit la clé étrangère !! users.id = nom_de_la_table.nom_colonne
user_id = Column(Integer, ForeignKey('users.id'))
# Optionel On défini la relation des objets entre eux
# Ici je dis que l'objet BasketItem contient une relation qui cible l'objet User
# le back_population est optionel
# !! Le back_population est le nom du champ dans l'objet !!
# Pour la relationship, je passe le nom de l'objet sous forme de string,
# pour éviter les boucles d'import example :
# Le fichier user.py qui import la classe BasketItem de basket_item.py
# et le fichier basket_item.py qui importe la classe User de user.py
user = relationship('User', back_populates='basket_items') user_id: Mapped[int] = mapped_column(ForeignKey('users.id'))
user = relationship('User', back_populates='basket_items')Du coté de l'entité ciblée :
# l'option cascade va me permettre de dire comment et quels opération faire en cascade
# - all => toutes les opération, insert, update, delete
# - save-update => on cascade les insert et update
# - delete => uniquement cascade les delete
# - delete-orphan => supprime les éléments qui ne sont plus lié à un parent automatiquement.
basket_items = relationship('BasketItem', back_populates='user', cascade='all, delete-orphan')Créer une table d'association
from sqlalchemy import Table, Integer, Column, ForeignKey
from Models.Base import Base
user_roles = Table('user_roles', Base.metadata,
Column('user_id', Integer, ForeignKey('users.id')),
Column('role_id', Integer, ForeignKey('roles.id'))
)Faire les relations dans les entités respectives.
Dans l'entité User
roles = relationship('Role', back_populates='users', secondary='user_roles')Dans l'entité Role
users = relationship('User', back_populates='roles', secondary='user_roles')Dans ce cas ci il faut faire une entité supplémentaires, stockant les données
from sqlalchemy import Column, Integer, String, Float, ForeignKey
from sqlalchemy.orm import relationship
from Models.Base import Base
class BasketItem(Base):
__tablename__ = 'basketitems'
id = Column(Integer, primary_key=True)
quantity = Column(Integer, nullable=False)
item_id = Column(Integer, ForeignKey('items.id'))
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship('User', back_populates='basket_items')
item = relationship('Item', back_populates='basket_items')Utilisation de SqlAlchemy pour les select :
# 1. Récupérer tous les enregistrements
users = User.query.all()
print("Tous les utilisateurs:", users)
# 2. Filtrer (WHERE)
alice = User.query.filter_by(username="alice").first()
print("Utilisateur Alice:", alice)
# 3. Filtre plus complexe
emails = User.query.filter(User.email.like("%@example.com")).all()
print("Tous les emails @example.com:", emails)
# 4. ORDER BY
ordered_users = User.query.order_by(User.username.desc()).all()
print("Utilisateurs triés:", ordered_users)
# 5. LIMIT
first_two = User.query.limit(2).all()
print("Deux premiers:", first_two)
# 6. OFFSET + LIMIT (Pagination)
second_page = User.query.offset(2).limit(2).all()
print("Page 2:", second_page)
# 7. COUNT
count = User.query.count()
print("Nombre total d’utilisateurs:", count)
# 8. Projection (sélection de colonnes spécifiques)
usernames = db.session.query(User.username).all()
print("Liste des usernames:", usernames)
# 9. Filtre In
users = User.query.filter(User.username.in_(['alice', 'test'])).all()
print("Liste des utilisateurs:", emails)initialiser la DB
flask db initgénérer une migration
flask db migrate -m "nom de la migration"
OU
flask db revision --autogenerate -m "nom de la migration"Mettre à jour la DB
flask db upgrade- des fonctions pour contacter la db :
- findOne => retourne une entité en fonction de son id
- findOneBy => retourne une entité en fonction d'un
paramètre passé (données d'une des colonnes)
- findAll => retourne toutes les data
- update => met à jours les donnés en DB
- insert => insert des nouvelles donnée en DB
- delete => delete une entrée en DB
class UserService:
def find_all(self):
# User.query.all() -> permet de récupérer toutes les données en DB
return [UserDTO(u) for u in User.query.all()]
def find_one(self, id):
# User.query.filter_by(champ_de_l_objet=valeur) -> permet de récupérer les entités
# respectant une certaine valeur de champ
# le .first() permet de récupérer seulement le première élément
user = User.query.filter_by(userid=id).first()
return UserDTO(user) if user else None
def insert(self, form: UserRegisterForm):
user = User(
username=form.username.data,
password=form.password.data
)
# db.session.add(entité) -> permet d'ajouter une entité en DB via la transaction en cours
db.session.add(user)
# db.session.commit() -> confirme les modification et applique la transaction en DB
db.session.commit()
return UserDTO(user)
def update(self, id, form: UserUpdateForm):
user = User.query.filter_by(userid=id).first()
user.lastname = form.lastname.data
user.firstname = form.firstname.data
if form.password.data:
user.password = form.password.data
# db.session.commit() -> confirme les modification et applique la transaction en DB
db.session.commit()
return UserDTO(user)
def delete(self, user: User):
# db.session -> créé une transaction
# db.session.delete(entité) -> supprimera l'entité
db.session.delete(user)
# db.session.commit() -> confirme les modification et applique la transaction en DB
db.session.commit()
return UserDTO(user)