Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
687 changes: 687 additions & 0 deletions test-apps/03-fastapi-blog-api/app.py

Large diffs are not rendered by default.

152 changes: 152 additions & 0 deletions test-apps/03-fastapi-blog-api/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""
JWT Authentication module for FastAPI Blog API.
Uses FastAPI's Dependency Injection instead of Flask's g/decorators.
"""
import logging
from typing import Optional

import bcrypt
import jwt
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer

from config import Settings

logger = logging.getLogger(__name__)

# HTTPBearer with auto_error=False so optional auth returns None instead of 401
_bearer_scheme = HTTPBearer(auto_error=False)


class AuthManager:
"""Manages JWT authentication operations."""

def __init__(self, cfg: Settings) -> None:
self.secret_key = cfg.JWT_SECRET_KEY
self.algorithm = cfg.JWT_ALGORITHM
self.expiration = cfg.JWT_EXPIRATION

# ------------------------------------------------------------------
# Password helpers
# ------------------------------------------------------------------

def hash_password(self, password: str) -> str:
"""Hash a plain-text password using bcrypt."""
salt = bcrypt.gensalt()
return bcrypt.hashpw(password.encode(), salt).decode()

def verify_password(self, password: str, hashed: str) -> bool:
"""Verify a plain-text password against its bcrypt hash."""
return bcrypt.checkpw(password.encode(), hashed.encode())

# ------------------------------------------------------------------
# Token helpers
# ------------------------------------------------------------------

def generate_token(self, user_id: str, username: str, role: str) -> str:
"""Generate a signed JWT for the given user."""
from datetime import datetime, timezone

now = datetime.now(timezone.utc)
payload = {
"user_id": user_id,
"username": username,
"role": role,
"iat": now,
"exp": now + self.expiration,
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)

def decode_token(self, token: str) -> dict:
"""
Decode and verify a JWT.

Raises:
jwt.ExpiredSignatureError: token has expired
jwt.InvalidTokenError: token is otherwise invalid
"""
return jwt.decode(token, self.secret_key, algorithms=[self.algorithm])


# ---------------------------------------------------------------------------
# FastAPI dependency: retrieve AuthManager from app.state
# ---------------------------------------------------------------------------

def get_auth_manager(request: Request) -> AuthManager:
"""FastAPI dependency — returns the shared AuthManager instance."""
return request.app.state.auth_manager


# ---------------------------------------------------------------------------
# FastAPI dependency: required authentication
# ---------------------------------------------------------------------------

def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(_bearer_scheme),
auth_manager: AuthManager = Depends(get_auth_manager),
) -> dict:
"""
FastAPI dependency that requires a valid Bearer token.

Returns the decoded token payload as a dict::

{"user_id": "1", "username": "alice", "role": "author"}

Raises:
HTTPException 401 – if no token, expired, or invalid
"""
if credentials is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={"error": "Authentication required", "message": "No token provided"},
headers={"WWW-Authenticate": "Bearer"},
)

token = credentials.credentials
try:
payload = auth_manager.decode_token(token)
return {
"user_id": payload["user_id"],
"username": payload["username"],
"role": payload["role"],
}
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={"error": "Token expired", "message": "Please login again"},
headers={"WWW-Authenticate": "Bearer"},
)
except jwt.InvalidTokenError:
# Do not expose internal JWT error details to the client
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={"error": "Invalid token", "message": "Token is invalid"},
headers={"WWW-Authenticate": "Bearer"},
)


# ---------------------------------------------------------------------------
# FastAPI dependency: optional authentication
# ---------------------------------------------------------------------------

def get_optional_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(_bearer_scheme),
auth_manager: AuthManager = Depends(get_auth_manager),
) -> Optional[dict]:
"""
FastAPI dependency that accepts an optional Bearer token.

Returns the payload dict when a valid token is present, otherwise None.
Invalid/expired tokens are silently ignored.
"""
if credentials is None:
return None
try:
payload = auth_manager.decode_token(credentials.credentials)
return {
"user_id": payload["user_id"],
"username": payload["username"],
"role": payload["role"],
}
except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
return None
46 changes: 46 additions & 0 deletions test-apps/03-fastapi-blog-api/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""
Configuration for FastAPI Blog API.
Settings are loaded from environment variables with sensible defaults.
"""
import os
from datetime import timedelta


class Settings:
"""Application settings (read from environment variables)."""

# JWT
JWT_SECRET_KEY: str = os.environ.get(
"JWT_SECRET_KEY", "jwt-secret-key-change-in-production"
)
JWT_ALGORITHM: str = "HS256"
JWT_EXPIRATION: timedelta = timedelta(
minutes=int(os.environ.get("JWT_EXPIRATION_MINUTES", 60 * 24))
)

# API metadata
API_TITLE: str = "FastAPI Blog API"
API_VERSION: str = "1.0.0"
API_DESCRIPTION: str = "Blog API with RBAC authorization (FastAPI edition)"

# CORS
CORS_ORIGINS: list = [
"http://localhost:3000",
"http://localhost:8000",
]

# Pagination
DEFAULT_PAGE_SIZE: int = 20
MAX_PAGE_SIZE: int = 100

# Multi-tenancy
DEFAULT_DOMAIN: str = "default"


class TestingSettings(Settings):
"""Shorter token expiry for tests."""
JWT_EXPIRATION = timedelta(minutes=5)


settings = Settings()
testing_settings = TestingSettings()
187 changes: 187 additions & 0 deletions test-apps/03-fastapi-blog-api/dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""
RBAC Dependency Injection for FastAPI Blog API.

Replaces Flask's custom decorators with FastAPI's native DI system.
Each class/function is a callable that can be used with Depends().

Usage in routes::

@router.post("/posts")
def create_post(
data: CreatePostRequest,
current_user: dict = Depends(get_current_user),
_rbac: None = Depends(RequirePermission("create", "post")),
storage: InMemoryStorage = Depends(get_storage),
rbac: RBAC = Depends(get_rbac),
): ...
"""
import logging
from typing import Optional

from fastapi import Depends, HTTPException, Request, status

from auth import get_current_user
from storage import InMemoryStorage

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# App-state accessors (thin DI wrappers over request.app.state)
# ---------------------------------------------------------------------------

def get_storage(request: Request) -> InMemoryStorage:
"""Return the shared InMemoryStorage from app.state."""
return request.app.state.storage


def get_rbac(request: Request):
"""Return the shared RBAC instance from app.state."""
return request.app.state.rbac


# ---------------------------------------------------------------------------
# Permission dependency (class-based, works with Depends)
# ---------------------------------------------------------------------------

class RequirePermission:
"""
FastAPI dependency that enforces an RBAC permission check.

Optionally verifies resource ownership when *check_ownership* is True.
The resource ID is read from path parameters (``post_id`` or ``comment_id``).

Example::

@router.put("/posts/{post_id}")
def update_post(
post_id: int,
data: UpdatePostRequest,
current_user: dict = Depends(get_current_user),
_: None = Depends(RequirePermission("update", "post", check_ownership=True)),
): ...
"""

def __init__(
self,
action: str,
resource_type: Optional[str] = None,
check_ownership: bool = False,
) -> None:
self.action = action
self.resource_type = resource_type
self.check_ownership = check_ownership

def __call__(
self,
request: Request,
current_user: dict = Depends(get_current_user),
storage: InMemoryStorage = Depends(get_storage),
rbac=Depends(get_rbac),
) -> None:
"""Perform the RBAC check; raises HTTPException on failure."""
resource = None

if self.check_ownership:
resource_id = (
request.path_params.get("post_id")
or request.path_params.get("comment_id")
or request.path_params.get("id")
)
if resource_id:
resource_id = str(resource_id)
if self.resource_type and "post" in self.resource_type:
resource = storage.get_post(resource_id)
elif self.resource_type and "comment" in self.resource_type:
resource = storage.get_comment(resource_id)

if resource is None:
label = (self.resource_type or "resource").capitalize()
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail={"error": "Not found", "message": f"{label} not found"},
)

try:
context: dict = {
"user_id": current_user["user_id"],
"username": current_user["username"],
"role": current_user["role"],
}

if resource is not None:
owner_id = getattr(resource, "author_id", None) or getattr(resource, "user_id", None)
context["resource_owner"] = owner_id
context["is_owner"] = owner_id == current_user["user_id"]

rbac_user_id = f"user_{current_user['user_id']}"
can_access = rbac.can(
user_id=rbac_user_id,
action=self.action,
resource=self.resource_type,
context=context,
)

if not can_access:
if self.check_ownership and resource and not context.get("is_owner"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"error": "Forbidden",
"message": "You can only modify your own content",
"reason": "ownership_required",
},
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"error": "Forbidden",
"message": f"You do not have permission to {self.action} {self.resource_type}",
"reason": "permission_denied",
},
)

except HTTPException:
raise # Re-raise HTTP exceptions untouched
except Exception as exc:
logger.error("Authorization check failed: %s", str(exc), exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={"error": "Authorization error", "message": "Failed to check permissions"},
)


# ---------------------------------------------------------------------------
# Role dependency
# ---------------------------------------------------------------------------

class RequireRole:
"""
FastAPI dependency that enforces one of the specified role(s).

Example::

@router.get("/admin/users")
def list_users(
current_user: dict = Depends(get_current_user),
_: None = Depends(RequireRole("admin")),
): ...
"""

def __init__(self, *roles: str) -> None:
self.roles = roles

def __call__(self, current_user: dict = Depends(get_current_user)) -> None:
if current_user.get("role") not in self.roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"error": "Forbidden",
"message": f"This action requires one of these roles: {', '.join(self.roles)}",
"your_role": current_user.get("role"),
},
)


# Shorthand singleton for admin-only routes
require_admin = RequireRole("admin")
Loading
Loading