From b9b20d3f5841db629e1accf64b4c927f761e2cae Mon Sep 17 00:00:00 2001 From: Maneesh-Relanto Date: Mon, 23 Feb 2026 13:29:06 +0530 Subject: [PATCH] feat: add FastAPI Blog API integration (test-apps/03-fastapi-blog-api) Complete FastAPI Blog API demonstrating RBAC integration with FastAPI patterns. Mirrors the Flask test-app (02) but using idiomatic FastAPI conventions. Architecture: - Pydantic v2 models for request/response validation + auto OpenAPI docs - FastAPI Depends() DI system replaces Flask decorators - HTTPBearer authentication with optional auth support - Lifespan context manager for startup/shutdown (vs Flask app factory) - RequirePermission / RequireRole class-based DI guards Files added: - config.py Settings dataclass with env var support - models.py App-layer dataclasses (User, Post, Comment, SystemStats) - schemas.py Pydantic v2 request+response models - storage.py InMemoryStorage CRUD layer - auth.py AuthManager + FastAPI auth dependencies - dependencies.py RequirePermission, RequireRole DI classes + storage/rbac accessors - seed_data.py Demo data loader (5 users, 5 posts, 7 comments) - app.py FastAPI application with lifespan, CORS, all routes - requirements.txt fastapi, uvicorn, PyJWT, bcrypt, httpx, pytest-asyncio - test_api.py 39 tests covering auth, posts, comments, admin, RBAC enforcement Tests: 39/39 passing --- test-apps/03-fastapi-blog-api/app.py | 687 ++++++++++++++++++ test-apps/03-fastapi-blog-api/auth.py | 152 ++++ test-apps/03-fastapi-blog-api/config.py | 46 ++ test-apps/03-fastapi-blog-api/dependencies.py | 187 +++++ test-apps/03-fastapi-blog-api/models.py | 132 ++++ .../03-fastapi-blog-api/requirements.txt | 13 + test-apps/03-fastapi-blog-api/schemas.py | 249 +++++++ test-apps/03-fastapi-blog-api/seed_data.py | 130 ++++ test-apps/03-fastapi-blog-api/storage.py | 208 ++++++ test-apps/03-fastapi-blog-api/test_api.py | 417 +++++++++++ 10 files changed, 2221 insertions(+) create mode 100644 test-apps/03-fastapi-blog-api/app.py create mode 100644 test-apps/03-fastapi-blog-api/auth.py create mode 100644 test-apps/03-fastapi-blog-api/config.py create mode 100644 test-apps/03-fastapi-blog-api/dependencies.py create mode 100644 test-apps/03-fastapi-blog-api/models.py create mode 100644 test-apps/03-fastapi-blog-api/requirements.txt create mode 100644 test-apps/03-fastapi-blog-api/schemas.py create mode 100644 test-apps/03-fastapi-blog-api/seed_data.py create mode 100644 test-apps/03-fastapi-blog-api/storage.py create mode 100644 test-apps/03-fastapi-blog-api/test_api.py diff --git a/test-apps/03-fastapi-blog-api/app.py b/test-apps/03-fastapi-blog-api/app.py new file mode 100644 index 0000000..107c852 --- /dev/null +++ b/test-apps/03-fastapi-blog-api/app.py @@ -0,0 +1,687 @@ +""" +FastAPI Blog API — Main Application +==================================== +A feature-complete blog REST API demonstrating RBAC integration with FastAPI. + +Mirrors the Flask test-app (02-flask-blog-api) but uses FastAPI patterns: + • Pydantic v2 request/response models → automatic validation + OpenAPI docs + • Dependency injection via Depends() → clean, testable route handlers + • Lifespan context manager → replaces Flask's before_first_request + • HTTPBearer authentication → standard FastAPI auth pattern + +Run:: + + pip install -r requirements.txt + python app.py + +Or with uvicorn:: + + uvicorn app:app --reload --port 8000 + +Interactive docs available at: http://localhost:8000/docs +""" +import logging +import sys +from contextlib import asynccontextmanager +from typing import Optional + +import uvicorn +from fastapi import Depends, FastAPI, HTTPException, Request, status +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse + +# ── local imports (all in same directory) ────────────────────────────────── +from auth import get_current_user, get_optional_user +from config import Settings, TestingSettings +from dependencies import RequirePermission, RequireRole, get_rbac, get_storage +from models import PostStatus +from schemas import ( + CommentListResponse, + CommentResponse, + CreateCommentRequest, + CreatePostRequest, + ErrorResponse, + HealthResponse, + MessageResponse, + PostListResponse, + PostResponse, + RegisterRequest, + LoginRequest, + StatsResponse, + TokenResponse, + UpdatePostRequest, + UpdateRoleRequest, + UserListResponse, + UserResponse, +) +from seed_data import load_seed_data +from storage import InMemoryStorage + +logger = logging.getLogger(__name__) + +# ── RBAC library ──────────────────────────────────────────────────────────── +try: + import sys, os + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) + from rbac import RBAC +except ImportError as e: + logger.error("Failed to import RBAC library: %s", e) + sys.exit(1) + + +# ============================================================================ +# RBAC Setup (permissions & roles shared across lifespan calls) +# ============================================================================ + +def setup_rbac(rbac: RBAC) -> None: + """Create all roles, permissions, and assign permissions to roles.""" + + # ── permissions ───────────────────────────────────────────────────────── + permissions = [ + ("create", "post", "Create a blog post"), + ("read", "post", "Read a blog post"), + ("update", "post", "Update a blog post"), + ("delete", "post", "Delete a blog post"), + ("publish","post", "Publish/unpublish a blog post"), + ("create", "comment", "Add a comment"), + ("read", "comment", "Read comments"), + ("delete", "comment", "Delete a comment"), + ("manage", "users", "Manage user accounts"), + ("view", "stats", "View system statistics"), + ] + for action, resource, description in permissions: + perm_id = f"perm_{action}_{resource}" + try: + rbac.create_permission( + permission_id=perm_id, + resource_type=resource, + action=action, + description=description, + ) + except Exception: + pass # May already exist in testing scenarios + + # ── roles ─────────────────────────────────────────────────────────────── + roles = [ + ("role_admin", "Administrator", "Full access"), + ("role_editor", "Editor", "Manage all content"), + ("role_author", "Author", "Manage own content"), + ("role_reader", "Reader", "Read-only access"), + ] + for role_id, name, desc in roles: + try: + rbac.create_role(role_id=role_id, name=name, description=desc) + except Exception: + pass + + # ── role → permission assignments ─────────────────────────────────────── + role_permissions: dict[str, list[str]] = { + "role_admin": [ + "perm_create_post", "perm_read_post", "perm_update_post", + "perm_delete_post", "perm_publish_post", + "perm_create_comment", "perm_read_comment", "perm_delete_comment", + "perm_manage_users", "perm_view_stats", + ], + "role_editor": [ + "perm_create_post", "perm_read_post", "perm_update_post", + "perm_delete_post", "perm_publish_post", + "perm_create_comment", "perm_read_comment", "perm_delete_comment", + "perm_view_stats", + ], + "role_author": [ + "perm_create_post", "perm_read_post", "perm_update_post", "perm_delete_post", + "perm_create_comment", "perm_read_comment", "perm_delete_comment", + ], + "role_reader": [ + "perm_read_post", + "perm_create_comment", "perm_read_comment", + ], + } + + for role_id, perms in role_permissions.items(): + for perm_id in perms: + try: + rbac.add_permission_to_role(role_id=role_id, permission_id=perm_id) + except Exception: + pass + + +# ============================================================================ +# Application factory +# ============================================================================ + +def create_app(testing: bool = False) -> FastAPI: + """ + Create and configure the FastAPI application. + + Args: + testing: When True, uses TestingSettings (no DB state between imports). + """ + cfg: Settings = TestingSettings() if testing else Settings() + + # ── lifespan: init & teardown at startup / shutdown ───────────────────── + @asynccontextmanager + async def lifespan(app: FastAPI): + # Startup + storage = InMemoryStorage() + rbac = RBAC() # defaults to in-memory storage + from auth import AuthManager + auth_manager = AuthManager(cfg) + + # Configure RBAC schema + setup_rbac(rbac) + + # Store on app.state so dependency functions can access them + app.state.storage = storage + app.state.rbac = rbac + app.state.auth_manager = auth_manager + app.state.config = cfg + + # Seed with demo data (skipped in unit tests that call this factory + # with testing=True and then reload their own fixtures) + if not testing: + try: + load_seed_data(storage, rbac, auth_manager) + except Exception as exc: + logger.warning("Seed data loading failed (non-fatal): %s", exc) + + logger.info("FastAPI Blog API started (testing=%s)", testing) + yield + + # Shutdown + logger.info("FastAPI Blog API shutting down") + + # ── FastAPI instance ───────────────────────────────────────────────────── + app = FastAPI( + title="FastAPI Blog API with RBAC", + description=( + "A blog REST API demonstrating Role-Based Access Control (RBAC) " + "using the rbac-algorithm library. " + "Interactive docs: /docs | ReDoc: /redoc" + ), + version="1.0.0", + lifespan=lifespan, + docs_url="/docs", + redoc_url="/redoc", + ) + + # ── CORS ──────────────────────────────────────────────────────────────── + app.add_middleware( + CORSMiddleware, + allow_origins=cfg.CORS_ORIGINS, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + # ── Custom exception handlers ──────────────────────────────────────────── + @app.exception_handler(404) + async def not_found_handler(request: Request, exc: HTTPException): + return JSONResponse( + status_code=404, + content={"error": "Not Found", "message": "The requested resource was not found"}, + ) + + @app.exception_handler(405) + async def method_not_allowed_handler(request: Request, exc: HTTPException): + return JSONResponse( + status_code=405, + content={"error": "Method Not Allowed", "message": "This HTTP method is not allowed here"}, + ) + + @app.exception_handler(500) + async def internal_error_handler(request: Request, exc: Exception): + logger.error("Unhandled error: %s", str(exc), exc_info=True) + return JSONResponse( + status_code=500, + content={"error": "Internal Server Error", "message": "An unexpected error occurred"}, + ) + + # ======================================================================== + # Routes + # ======================================================================== + + # ── Health & root ──────────────────────────────────────────────────────── + + @app.get("/health", response_model=HealthResponse, tags=["System"]) + async def health_check(): + """Health check — returns service status.""" + return HealthResponse(status="healthy", service="FastAPI Blog API", version="1.0.0") + + @app.get("/", tags=["System"]) + async def root(): + """API information.""" + return { + "service": "FastAPI Blog API with RBAC", + "version": "1.0.0", + "docs": "/docs", + "health": "/health", + "endpoints": { + "auth": ["/auth/register", "/auth/login", "/auth/me"], + "posts": ["/posts", "/posts/{id}", "/posts/{id}/publish", "/posts/{id}/comments"], + "comments":["/comments/{id}"], + "admin": ["/admin/users", "/admin/users/{id}/role", "/admin/stats"], + }, + } + + # ── Authentication ─────────────────────────────────────────────────────── + + @app.post("/auth/register", response_model=TokenResponse, status_code=201, tags=["Auth"]) + async def register( + body: RegisterRequest, + request: Request, + storage: InMemoryStorage = Depends(get_storage), + rbac=Depends(get_rbac), + ): + """Register a new user account.""" + if storage.get_user_by_username(body.username): + raise HTTPException(status_code=409, detail={"error": "Conflict", "message": "Username already taken"}) + if storage.get_user_by_email(body.email): + raise HTTPException(status_code=409, detail={"error": "Conflict", "message": "Email already registered"}) + + auth_manager = request.app.state.auth_manager + pw_hash = auth_manager.hash_password(body.password) + user = storage.create_user( + username=body.username, + email=body.email, + password_hash=pw_hash, + role="reader", + ) + + rbac.create_user(user_id=f"user_{user.id}", email=user.email, name=user.username) + rbac.assign_role(f"user_{user.id}", "role_reader") + + token = auth_manager.generate_token(user.id, user.username, user.role) + return TokenResponse( + access_token=token, + user=UserResponse( + id=user.id, username=user.username, email=user.email, + role=user.role, created_at=user.created_at.isoformat(), + ), + message="Registration successful", + ) + + @app.post("/auth/login", response_model=TokenResponse, tags=["Auth"]) + async def login( + body: LoginRequest, + request: Request, + storage: InMemoryStorage = Depends(get_storage), + ): + """Log in and receive a JWT access token.""" + user = storage.get_user_by_username(body.username) + auth_manager = request.app.state.auth_manager + if not user or not auth_manager.verify_password(body.password, user.password_hash): + raise HTTPException(status_code=401, detail={"error": "Unauthorized", "message": "Invalid credentials"}) + + token = auth_manager.generate_token(user.id, user.username, user.role) + return TokenResponse( + access_token=token, + user=UserResponse( + id=user.id, username=user.username, email=user.email, + role=user.role, created_at=user.created_at.isoformat(), + ), + message="Login successful", + ) + + @app.get("/auth/me", response_model=UserResponse, tags=["Auth"]) + async def get_me( + current_user: dict = Depends(get_current_user), + storage: InMemoryStorage = Depends(get_storage), + ): + """Return the currently authenticated user's profile.""" + user = storage.get_user(current_user["user_id"]) + if not user: + raise HTTPException(status_code=404, detail={"error": "Not Found", "message": "User not found"}) + return UserResponse( + id=user.id, username=user.username, email=user.email, + role=user.role, created_at=user.created_at.isoformat(), + ) + + # ── Posts ──────────────────────────────────────────────────────────────── + + @app.get("/posts", response_model=PostListResponse, tags=["Posts"]) + async def list_posts( + status_filter: Optional[str] = None, + current_user: Optional[dict] = Depends(get_optional_user), + storage: InMemoryStorage = Depends(get_storage), + ): + """ + List blog posts. + + - Unauthenticated users and readers see published posts only. + - Authors see their own drafts + all published posts. + - Editors and admins see all posts. + """ + role = current_user.get("role") if current_user else None + + if role in ("admin", "editor"): + status_enum = PostStatus(status_filter) if status_filter else None + posts = storage.list_posts(status=status_enum) + elif role == "author" and current_user: + posts_all = storage.list_posts() + posts = [ + p for p in posts_all + if p.status == PostStatus.PUBLISHED or p.author_id == current_user["user_id"] + ] + else: + posts = storage.list_posts(status=PostStatus.PUBLISHED) + + return PostListResponse( + posts=[p.to_summary_dict() for p in posts], + total=len(posts), + ) + + @app.post("/posts", response_model=PostResponse, status_code=201, tags=["Posts"]) + async def create_post( + body: CreatePostRequest, + current_user: dict = Depends(get_current_user), + _perm: None = Depends(RequirePermission("create", "post")), + storage: InMemoryStorage = Depends(get_storage), + ): + """Create a new blog post (requires 'create post' permission).""" + post = storage.create_post( + title=body.title, + content=body.content, + author_id=current_user["user_id"], + author_username=current_user["username"], + status=PostStatus(body.status) if body.status else PostStatus.DRAFT, + tags=body.tags or [], + ) + return PostResponse(**post.to_dict()) + + @app.get("/posts/{post_id}", tags=["Posts"]) + async def get_post( + post_id: str, + current_user: Optional[dict] = Depends(get_optional_user), + storage: InMemoryStorage = Depends(get_storage), + ): + """Get a single post by ID.""" + post = storage.get_post(post_id) + if not post: + raise HTTPException(status_code=404, detail={"error": "Not Found", "message": "Post not found"}) + + role = current_user.get("role") if current_user else None + is_owner = current_user and post.author_id == current_user.get("user_id") + + if post.status != PostStatus.PUBLISHED and not (role in ("admin", "editor") or is_owner): + raise HTTPException(status_code=404, detail={"error": "Not Found", "message": "Post not found"}) + + return post.to_dict() + + @app.put("/posts/{post_id}", tags=["Posts"]) + async def update_post( + post_id: str, + body: UpdatePostRequest, + current_user: dict = Depends(get_current_user), + storage: InMemoryStorage = Depends(get_storage), + rbac=Depends(get_rbac), + ): + """Update a post. Authors can only update their own posts; editors/admins can update any.""" + post = storage.get_post(post_id) + if not post: + raise HTTPException(status_code=404, detail={"error": "Not Found", "message": "Post not found"}) + + role = current_user.get("role") + is_owner = post.author_id == current_user["user_id"] + + can_update = rbac.can( + user_id=f"user_{current_user['user_id']}", + action="update", + resource="post", + context={ + "user_id": current_user["user_id"], + "role": role, + "resource_owner": post.author_id, + "is_owner": is_owner, + }, + ) + if not can_update: + raise HTTPException( + status_code=403, + detail={"error": "Forbidden", "message": "You do not have permission to update this post"}, + ) + + updates: dict = {} + if body.title is not None: + updates["title"] = body.title + if body.content is not None: + updates["content"] = body.content + if body.tags is not None: + updates["tags"] = body.tags + + updated = storage.update_post(post_id, **updates) + return updated.to_dict() + + @app.delete("/posts/{post_id}", response_model=MessageResponse, tags=["Posts"]) + async def delete_post( + post_id: str, + current_user: dict = Depends(get_current_user), + storage: InMemoryStorage = Depends(get_storage), + rbac=Depends(get_rbac), + ): + """Delete a post. Authors can delete their own; editors/admins can delete any.""" + post = storage.get_post(post_id) + if not post: + raise HTTPException(status_code=404, detail={"error": "Not Found", "message": "Post not found"}) + + is_owner = post.author_id == current_user["user_id"] + role = current_user.get("role") + + # Admin and editor can delete any post; authors can only delete their own + if role in ("admin", "editor"): + can_delete = True + elif role == "author" and is_owner: + can_delete = True + else: + can_delete = False + if not can_delete: + raise HTTPException( + status_code=403, + detail={"error": "Forbidden", "message": "You do not have permission to delete this post"}, + ) + + storage.delete_post(post_id) + return MessageResponse(message="Post deleted successfully") + + @app.put("/posts/{post_id}/publish", tags=["Posts"]) + async def publish_post( + post_id: str, + current_user: dict = Depends(get_current_user), + _perm: None = Depends(RequirePermission("publish", "post")), + storage: InMemoryStorage = Depends(get_storage), + ): + """Toggle a post's published/draft status.""" + post = storage.get_post(post_id) + if not post: + raise HTTPException(status_code=404, detail={"error": "Not Found", "message": "Post not found"}) + + new_status = PostStatus.PUBLISHED if post.status == PostStatus.DRAFT else PostStatus.DRAFT + updated = storage.update_post(post_id, status=new_status) + return { + **updated.to_dict(), + "message": f"Post {'published' if new_status == PostStatus.PUBLISHED else 'unpublished'} successfully", + } + + # ── Comments ───────────────────────────────────────────────────────────── + + @app.get("/posts/{post_id}/comments", response_model=CommentListResponse, tags=["Comments"]) + async def get_comments( + post_id: str, + storage: InMemoryStorage = Depends(get_storage), + ): + """Get all comments for a post.""" + post = storage.get_post(post_id) + if not post: + raise HTTPException(status_code=404, detail={"error": "Not Found", "message": "Post not found"}) + + comments = storage.list_comments(post_id=post_id) + return CommentListResponse( + comments=[c.to_dict() for c in comments], + total=len(comments), + ) + + @app.post("/posts/{post_id}/comments", response_model=CommentResponse, status_code=201, tags=["Comments"]) + async def add_comment( + post_id: str, + body: CreateCommentRequest, + current_user: dict = Depends(get_current_user), + _perm: None = Depends(RequirePermission("create", "comment")), + storage: InMemoryStorage = Depends(get_storage), + ): + """Add a comment to a post.""" + post = storage.get_post(post_id) + if not post: + raise HTTPException(status_code=404, detail={"error": "Not Found", "message": "Post not found"}) + + if post.status != PostStatus.PUBLISHED: + raise HTTPException( + status_code=400, + detail={"error": "Bad Request", "message": "Cannot comment on non-published posts"}, + ) + + comment = storage.create_comment( + post_id=post_id, + content=body.content, + author_id=current_user["user_id"], + author_username=current_user["username"], + ) + return CommentResponse(**comment.to_dict()) + + @app.delete("/comments/{comment_id}", response_model=MessageResponse, tags=["Comments"]) + async def delete_comment( + comment_id: str, + current_user: dict = Depends(get_current_user), + storage: InMemoryStorage = Depends(get_storage), + rbac=Depends(get_rbac), + ): + """Delete a comment. Authors can delete their own; editors/admins can delete any.""" + comment = storage.get_comment(comment_id) + if not comment: + raise HTTPException(status_code=404, detail={"error": "Not Found", "message": "Comment not found"}) + + is_owner = comment.author_id == current_user["user_id"] + role = current_user.get("role") + + # Admin and editor can delete any comment; anyone who owns it can delete it + if role in ("admin", "editor"): + can_delete = True + elif is_owner: + can_delete = True + else: + can_delete = False + if not can_delete: + raise HTTPException( + status_code=403, + detail={"error": "Forbidden", "message": "You do not have permission to delete this comment"}, + ) + + storage.delete_comment(comment_id) + return MessageResponse(message="Comment deleted successfully") + + # ── Admin ──────────────────────────────────────────────────────────────── + + @app.get("/admin/users", response_model=UserListResponse, tags=["Admin"]) + async def list_users( + current_user: dict = Depends(get_current_user), + _role: None = Depends(RequireRole("admin")), + storage: InMemoryStorage = Depends(get_storage), + ): + """List all users (admin only).""" + users = storage.list_users() + return UserListResponse( + users=[ + UserResponse( + id=u.id, username=u.username, email=u.email, + role=u.role, created_at=u.created_at.isoformat(), + ) + for u in users + ], + total=len(users), + ) + + @app.put("/admin/users/{user_id}/role", response_model=UserResponse, tags=["Admin"]) + async def update_user_role( + user_id: str, + body: UpdateRoleRequest, + current_user: dict = Depends(get_current_user), + _role: None = Depends(RequireRole("admin")), + storage: InMemoryStorage = Depends(get_storage), + rbac=Depends(get_rbac), + ): + """Update a user's role (admin only).""" + if user_id == current_user["user_id"]: + raise HTTPException( + status_code=400, + detail={"error": "Bad Request", "message": "You cannot change your own role"}, + ) + + user = storage.get_user(user_id) + if not user: + raise HTTPException(status_code=404, detail={"error": "Not Found", "message": "User not found"}) + + allowed_roles = {"reader", "author", "editor", "admin"} + if body.role not in allowed_roles: + raise HTTPException( + status_code=400, + detail={ + "error": "Bad Request", + "message": f"Invalid role. Must be one of: {', '.join(sorted(allowed_roles))}", + }, + ) + + updated_user = storage.update_user_role(user_id, body.role) + rbac_user_id = f"user_{user_id}" + old_rbac_role = f"role_{user.role}" + new_rbac_role = f"role_{body.role}" + + try: + rbac.revoke_role(rbac_user_id, old_rbac_role) + except Exception: + pass + rbac.assign_role(rbac_user_id, new_rbac_role) + + return UserResponse( + id=updated_user.id, + username=updated_user.username, + email=updated_user.email, + role=updated_user.role, + created_at=updated_user.created_at.isoformat(), + ) + + @app.get("/admin/stats", response_model=StatsResponse, tags=["Admin"]) + async def get_stats( + current_user: dict = Depends(get_current_user), + _role: None = Depends(RequireRole("admin", "editor")), + storage: InMemoryStorage = Depends(get_storage), + ): + """Get system statistics (admin and editor only).""" + stats = storage.get_stats() + return StatsResponse( + total_users=stats.total_users, + total_posts=stats.total_posts, + published_posts=stats.published_posts, + draft_posts=stats.draft_posts, + total_comments=stats.total_comments, + ) + + return app + + +# ============================================================================ +# Application singleton (imported by uvicorn / test client) +# ============================================================================ +app = create_app() + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s — %(message)s", + ) + uvicorn.run( + "app:app", + host="0.0.0.0", + port=8000, + reload=True, + log_level="info", + ) diff --git a/test-apps/03-fastapi-blog-api/auth.py b/test-apps/03-fastapi-blog-api/auth.py new file mode 100644 index 0000000..95518bf --- /dev/null +++ b/test-apps/03-fastapi-blog-api/auth.py @@ -0,0 +1,152 @@ +""" +JWT Authentication module for FastAPI Blog API. +Uses FastAPI's Dependency Injection instead of Flask's g/decorators. +""" +import logging +from typing import Optional + +import bcrypt +import jwt +from fastapi import Depends, HTTPException, Request, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer + +from config import Settings + +logger = logging.getLogger(__name__) + +# HTTPBearer with auto_error=False so optional auth returns None instead of 401 +_bearer_scheme = HTTPBearer(auto_error=False) + + +class AuthManager: + """Manages JWT authentication operations.""" + + def __init__(self, cfg: Settings) -> None: + self.secret_key = cfg.JWT_SECRET_KEY + self.algorithm = cfg.JWT_ALGORITHM + self.expiration = cfg.JWT_EXPIRATION + + # ------------------------------------------------------------------ + # Password helpers + # ------------------------------------------------------------------ + + def hash_password(self, password: str) -> str: + """Hash a plain-text password using bcrypt.""" + salt = bcrypt.gensalt() + return bcrypt.hashpw(password.encode(), salt).decode() + + def verify_password(self, password: str, hashed: str) -> bool: + """Verify a plain-text password against its bcrypt hash.""" + return bcrypt.checkpw(password.encode(), hashed.encode()) + + # ------------------------------------------------------------------ + # Token helpers + # ------------------------------------------------------------------ + + def generate_token(self, user_id: str, username: str, role: str) -> str: + """Generate a signed JWT for the given user.""" + from datetime import datetime, timezone + + now = datetime.now(timezone.utc) + payload = { + "user_id": user_id, + "username": username, + "role": role, + "iat": now, + "exp": now + self.expiration, + } + return jwt.encode(payload, self.secret_key, algorithm=self.algorithm) + + def decode_token(self, token: str) -> dict: + """ + Decode and verify a JWT. + + Raises: + jwt.ExpiredSignatureError: token has expired + jwt.InvalidTokenError: token is otherwise invalid + """ + return jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) + + +# --------------------------------------------------------------------------- +# FastAPI dependency: retrieve AuthManager from app.state +# --------------------------------------------------------------------------- + +def get_auth_manager(request: Request) -> AuthManager: + """FastAPI dependency — returns the shared AuthManager instance.""" + return request.app.state.auth_manager + + +# --------------------------------------------------------------------------- +# FastAPI dependency: required authentication +# --------------------------------------------------------------------------- + +def get_current_user( + credentials: Optional[HTTPAuthorizationCredentials] = Depends(_bearer_scheme), + auth_manager: AuthManager = Depends(get_auth_manager), +) -> dict: + """ + FastAPI dependency that requires a valid Bearer token. + + Returns the decoded token payload as a dict:: + + {"user_id": "1", "username": "alice", "role": "author"} + + Raises: + HTTPException 401 – if no token, expired, or invalid + """ + if credentials is None: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={"error": "Authentication required", "message": "No token provided"}, + headers={"WWW-Authenticate": "Bearer"}, + ) + + token = credentials.credentials + try: + payload = auth_manager.decode_token(token) + return { + "user_id": payload["user_id"], + "username": payload["username"], + "role": payload["role"], + } + except jwt.ExpiredSignatureError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={"error": "Token expired", "message": "Please login again"}, + headers={"WWW-Authenticate": "Bearer"}, + ) + except jwt.InvalidTokenError: + # Do not expose internal JWT error details to the client + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={"error": "Invalid token", "message": "Token is invalid"}, + headers={"WWW-Authenticate": "Bearer"}, + ) + + +# --------------------------------------------------------------------------- +# FastAPI dependency: optional authentication +# --------------------------------------------------------------------------- + +def get_optional_user( + credentials: Optional[HTTPAuthorizationCredentials] = Depends(_bearer_scheme), + auth_manager: AuthManager = Depends(get_auth_manager), +) -> Optional[dict]: + """ + FastAPI dependency that accepts an optional Bearer token. + + Returns the payload dict when a valid token is present, otherwise None. + Invalid/expired tokens are silently ignored. + """ + if credentials is None: + return None + try: + payload = auth_manager.decode_token(credentials.credentials) + return { + "user_id": payload["user_id"], + "username": payload["username"], + "role": payload["role"], + } + except (jwt.ExpiredSignatureError, jwt.InvalidTokenError): + return None diff --git a/test-apps/03-fastapi-blog-api/config.py b/test-apps/03-fastapi-blog-api/config.py new file mode 100644 index 0000000..34cb76c --- /dev/null +++ b/test-apps/03-fastapi-blog-api/config.py @@ -0,0 +1,46 @@ +""" +Configuration for FastAPI Blog API. +Settings are loaded from environment variables with sensible defaults. +""" +import os +from datetime import timedelta + + +class Settings: + """Application settings (read from environment variables).""" + + # JWT + JWT_SECRET_KEY: str = os.environ.get( + "JWT_SECRET_KEY", "jwt-secret-key-change-in-production" + ) + JWT_ALGORITHM: str = "HS256" + JWT_EXPIRATION: timedelta = timedelta( + minutes=int(os.environ.get("JWT_EXPIRATION_MINUTES", 60 * 24)) + ) + + # API metadata + API_TITLE: str = "FastAPI Blog API" + API_VERSION: str = "1.0.0" + API_DESCRIPTION: str = "Blog API with RBAC authorization (FastAPI edition)" + + # CORS + CORS_ORIGINS: list = [ + "http://localhost:3000", + "http://localhost:8000", + ] + + # Pagination + DEFAULT_PAGE_SIZE: int = 20 + MAX_PAGE_SIZE: int = 100 + + # Multi-tenancy + DEFAULT_DOMAIN: str = "default" + + +class TestingSettings(Settings): + """Shorter token expiry for tests.""" + JWT_EXPIRATION = timedelta(minutes=5) + + +settings = Settings() +testing_settings = TestingSettings() diff --git a/test-apps/03-fastapi-blog-api/dependencies.py b/test-apps/03-fastapi-blog-api/dependencies.py new file mode 100644 index 0000000..4d44b82 --- /dev/null +++ b/test-apps/03-fastapi-blog-api/dependencies.py @@ -0,0 +1,187 @@ +""" +RBAC Dependency Injection for FastAPI Blog API. + +Replaces Flask's custom decorators with FastAPI's native DI system. +Each class/function is a callable that can be used with Depends(). + +Usage in routes:: + + @router.post("/posts") + def create_post( + data: CreatePostRequest, + current_user: dict = Depends(get_current_user), + _rbac: None = Depends(RequirePermission("create", "post")), + storage: InMemoryStorage = Depends(get_storage), + rbac: RBAC = Depends(get_rbac), + ): ... +""" +import logging +from typing import Optional + +from fastapi import Depends, HTTPException, Request, status + +from auth import get_current_user +from storage import InMemoryStorage + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# App-state accessors (thin DI wrappers over request.app.state) +# --------------------------------------------------------------------------- + +def get_storage(request: Request) -> InMemoryStorage: + """Return the shared InMemoryStorage from app.state.""" + return request.app.state.storage + + +def get_rbac(request: Request): + """Return the shared RBAC instance from app.state.""" + return request.app.state.rbac + + +# --------------------------------------------------------------------------- +# Permission dependency (class-based, works with Depends) +# --------------------------------------------------------------------------- + +class RequirePermission: + """ + FastAPI dependency that enforces an RBAC permission check. + + Optionally verifies resource ownership when *check_ownership* is True. + The resource ID is read from path parameters (``post_id`` or ``comment_id``). + + Example:: + + @router.put("/posts/{post_id}") + def update_post( + post_id: int, + data: UpdatePostRequest, + current_user: dict = Depends(get_current_user), + _: None = Depends(RequirePermission("update", "post", check_ownership=True)), + ): ... + """ + + def __init__( + self, + action: str, + resource_type: Optional[str] = None, + check_ownership: bool = False, + ) -> None: + self.action = action + self.resource_type = resource_type + self.check_ownership = check_ownership + + def __call__( + self, + request: Request, + current_user: dict = Depends(get_current_user), + storage: InMemoryStorage = Depends(get_storage), + rbac=Depends(get_rbac), + ) -> None: + """Perform the RBAC check; raises HTTPException on failure.""" + resource = None + + if self.check_ownership: + resource_id = ( + request.path_params.get("post_id") + or request.path_params.get("comment_id") + or request.path_params.get("id") + ) + if resource_id: + resource_id = str(resource_id) + if self.resource_type and "post" in self.resource_type: + resource = storage.get_post(resource_id) + elif self.resource_type and "comment" in self.resource_type: + resource = storage.get_comment(resource_id) + + if resource is None: + label = (self.resource_type or "resource").capitalize() + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"error": "Not found", "message": f"{label} not found"}, + ) + + try: + context: dict = { + "user_id": current_user["user_id"], + "username": current_user["username"], + "role": current_user["role"], + } + + if resource is not None: + owner_id = getattr(resource, "author_id", None) or getattr(resource, "user_id", None) + context["resource_owner"] = owner_id + context["is_owner"] = owner_id == current_user["user_id"] + + rbac_user_id = f"user_{current_user['user_id']}" + can_access = rbac.can( + user_id=rbac_user_id, + action=self.action, + resource=self.resource_type, + context=context, + ) + + if not can_access: + if self.check_ownership and resource and not context.get("is_owner"): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={ + "error": "Forbidden", + "message": "You can only modify your own content", + "reason": "ownership_required", + }, + ) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={ + "error": "Forbidden", + "message": f"You do not have permission to {self.action} {self.resource_type}", + "reason": "permission_denied", + }, + ) + + except HTTPException: + raise # Re-raise HTTP exceptions untouched + except Exception as exc: + logger.error("Authorization check failed: %s", str(exc), exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": "Authorization error", "message": "Failed to check permissions"}, + ) + + +# --------------------------------------------------------------------------- +# Role dependency +# --------------------------------------------------------------------------- + +class RequireRole: + """ + FastAPI dependency that enforces one of the specified role(s). + + Example:: + + @router.get("/admin/users") + def list_users( + current_user: dict = Depends(get_current_user), + _: None = Depends(RequireRole("admin")), + ): ... + """ + + def __init__(self, *roles: str) -> None: + self.roles = roles + + def __call__(self, current_user: dict = Depends(get_current_user)) -> None: + if current_user.get("role") not in self.roles: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={ + "error": "Forbidden", + "message": f"This action requires one of these roles: {', '.join(self.roles)}", + "your_role": current_user.get("role"), + }, + ) + + +# Shorthand singleton for admin-only routes +require_admin = RequireRole("admin") diff --git a/test-apps/03-fastapi-blog-api/models.py b/test-apps/03-fastapi-blog-api/models.py new file mode 100644 index 0000000..1864dd4 --- /dev/null +++ b/test-apps/03-fastapi-blog-api/models.py @@ -0,0 +1,132 @@ +""" +Data models for FastAPI Blog API. +Simple dataclass-based models for posts, comments, and users. +Mirrors the Flask test-app models for consistency. +""" +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Optional, List +from enum import Enum + + +class PostStatus(str, Enum): + """Post status enumeration.""" + DRAFT = "draft" + PUBLISHED = "published" + ARCHIVED = "archived" + + +@dataclass +class User: + """User model.""" + id: str + username: str + email: str + password_hash: str + role: str # admin | editor | author | reader + created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + + def to_dict(self, include_password: bool = False) -> dict: + data = { + "id": self.id, + "username": self.username, + "email": self.email, + "role": self.role, + "created_at": self.created_at.isoformat(), + } + if include_password: + data["password_hash"] = self.password_hash + return data + + def to_public_dict(self) -> dict: + return {"id": self.id, "username": self.username, "role": self.role} + + +@dataclass +class Post: + """Blog post model.""" + id: str + title: str + content: str + author_id: str + author_username: str + status: PostStatus = PostStatus.DRAFT + created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + published_at: Optional[datetime] = None + tags: List[str] = field(default_factory=list) + view_count: int = 0 + + def to_dict(self, include_author: bool = True) -> dict: + data = { + "id": self.id, + "title": self.title, + "content": self.content, + "status": self.status.value if isinstance(self.status, PostStatus) else self.status, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + "published_at": self.published_at.isoformat() if self.published_at else None, + "tags": self.tags, + "view_count": self.view_count, + } + if include_author: + data["author"] = {"id": self.author_id, "username": self.author_username} + return data + + def to_summary_dict(self) -> dict: + preview = self.content[:200] + "..." if len(self.content) > 200 else self.content + return { + "id": self.id, + "title": self.title, + "content": preview, + "status": self.status.value if isinstance(self.status, PostStatus) else self.status, + "author": {"id": self.author_id, "username": self.author_username}, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + "tags": self.tags, + "view_count": self.view_count, + } + + +@dataclass +class Comment: + """Comment model.""" + id: str + post_id: str + content: str + author_id: str + author_username: str + created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + is_deleted: bool = False + + def to_dict(self, include_author: bool = True) -> dict: + data = { + "id": self.id, + "post_id": self.post_id, + "content": self.content, + "created_at": self.created_at.isoformat(), + "updated_at": self.updated_at.isoformat(), + } + if include_author: + data["author"] = {"id": self.author_id, "username": self.author_username} + return data + + +@dataclass +class SystemStats: + """System statistics model.""" + total_users: int = 0 + total_posts: int = 0 + total_comments: int = 0 + published_posts: int = 0 + draft_posts: int = 0 + + def to_dict(self) -> dict: + return { + "total_users": self.total_users, + "total_posts": self.total_posts, + "total_comments": self.total_comments, + "published_posts": self.published_posts, + "draft_posts": self.draft_posts, + } diff --git a/test-apps/03-fastapi-blog-api/requirements.txt b/test-apps/03-fastapi-blog-api/requirements.txt new file mode 100644 index 0000000..bb4e843 --- /dev/null +++ b/test-apps/03-fastapi-blog-api/requirements.txt @@ -0,0 +1,13 @@ +# FastAPI Blog API — runtime dependencies +fastapi>=0.110.0 +uvicorn[standard]>=0.27.0 +pydantic>=2.0.0 +PyJWT>=2.8.0 +bcrypt>=4.0.0 + +# HTTP client for tests (replaces requests; also used by TestClient internally) +httpx>=0.27.0 + +# Testing +pytest>=7.0.0 +pytest-asyncio>=0.21.0 diff --git a/test-apps/03-fastapi-blog-api/schemas.py b/test-apps/03-fastapi-blog-api/schemas.py new file mode 100644 index 0000000..b4ec9a8 --- /dev/null +++ b/test-apps/03-fastapi-blog-api/schemas.py @@ -0,0 +1,249 @@ +""" +Pydantic schemas for FastAPI Blog API request and response validation. +FastAPI uses these for automatic OpenAPI docs and JSON validation. +""" +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, List, Optional +from pydantic import BaseModel, EmailStr, field_validator + +from models import PostStatus + +# --------------------------------------------------------------------------- +# Auth schemas +# --------------------------------------------------------------------------- + +class RegisterRequest(BaseModel): + username: str + email: str + password: str + role: str = "reader" + + @field_validator("role") + @classmethod + def validate_role(cls, v: str) -> str: + valid = {"admin", "editor", "author", "reader"} + if v.lower() not in valid: + raise ValueError(f"Role must be one of: {', '.join(sorted(valid))}") + return v.lower() + + @field_validator("username") + @classmethod + def validate_username(cls, v: str) -> str: + v = v.strip() + if not v: + raise ValueError("Username cannot be empty") + return v + + @field_validator("password") + @classmethod + def validate_password(cls, v: str) -> str: + if not v: + raise ValueError("Password cannot be empty") + return v + + +class LoginRequest(BaseModel): + username: str + password: str + + @field_validator("username") + @classmethod + def validate_username(cls, v: str) -> str: + return v.strip() + + +class UserPublicResponse(BaseModel): + id: str + username: str + role: str + + +class UserResponse(BaseModel): + id: str + username: str + email: str + role: str + created_at: str + + +class TokenResponse(BaseModel): + access_token: str + token_type: str = "bearer" + user: UserResponse + message: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Post schemas +# --------------------------------------------------------------------------- + +class CreatePostRequest(BaseModel): + title: str + content: str + status: str = "draft" + tags: List[str] = [] + + @field_validator("title") + @classmethod + def validate_title(cls, v: str) -> str: + v = v.strip() + if not v: + raise ValueError("Title cannot be empty") + return v + + @field_validator("content") + @classmethod + def validate_content(cls, v: str) -> str: + v = v.strip() + if not v: + raise ValueError("Content cannot be empty") + return v + + @field_validator("status") + @classmethod + def validate_status(cls, v: str) -> str: + try: + PostStatus(v.lower()) + except ValueError: + valid = [s.value for s in PostStatus] + raise ValueError(f"Invalid status. Must be one of: {', '.join(valid)}") + return v.lower() + + +class UpdatePostRequest(BaseModel): + title: Optional[str] = None + content: Optional[str] = None + status: Optional[str] = None + tags: Optional[List[str]] = None + + @field_validator("status") + @classmethod + def validate_status(cls, v: Optional[str]) -> Optional[str]: + if v is None: + return v + try: + PostStatus(v.lower()) + except ValueError: + valid = [s.value for s in PostStatus] + raise ValueError(f"Invalid status. Must be one of: {', '.join(valid)}") + return v.lower() + + +class AuthorSummary(BaseModel): + id: str + username: str + + +class PostSummaryResponse(BaseModel): + id: str + title: str + content: str + status: str + author: AuthorSummary + created_at: str + updated_at: str + tags: List[str] + view_count: int + + +class PostResponse(BaseModel): + id: str + title: str + content: str + status: str + author: AuthorSummary + created_at: str + updated_at: str + published_at: Optional[str] + tags: List[str] + view_count: int + + +class PostListResponse(BaseModel): + posts: List[PostSummaryResponse] + total: int + count: Optional[int] = None # alias for backwards compat + + +# --------------------------------------------------------------------------- +# Comment schemas +# --------------------------------------------------------------------------- + +class CreateCommentRequest(BaseModel): + content: str + + @field_validator("content") + @classmethod + def validate_content(cls, v: str) -> str: + v = v.strip() + if not v: + raise ValueError("Content cannot be empty") + return v + + +class CommentResponse(BaseModel): + id: str + post_id: str + content: str + author: AuthorSummary + created_at: str + updated_at: str + + +class CommentListResponse(BaseModel): + comments: List[CommentResponse] + total: int + count: Optional[int] = None + + +# --------------------------------------------------------------------------- +# Admin schemas +# --------------------------------------------------------------------------- + +class UpdateRoleRequest(BaseModel): + role: str + + @field_validator("role") + @classmethod + def validate_role(cls, v: str) -> str: + valid = {"admin", "editor", "author", "reader"} + if v.lower() not in valid: + raise ValueError(f"Role must be one of: {', '.join(sorted(valid))}") + return v.lower() + + +class UserListResponse(BaseModel): + users: List[UserResponse] + total: int + count: Optional[int] = None + + +class StatsResponse(BaseModel): + total_users: int + total_posts: int + total_comments: int + published_posts: int + draft_posts: int + + +# --------------------------------------------------------------------------- +# Generic +# --------------------------------------------------------------------------- + +class MessageResponse(BaseModel): + message: str + + +class ErrorResponse(BaseModel): + error: str + message: str + detail: Optional[Any] = None + + +class HealthResponse(BaseModel): + status: str + service: Optional[str] = None + version: Optional[str] = None + timestamp: Optional[str] = None diff --git a/test-apps/03-fastapi-blog-api/seed_data.py b/test-apps/03-fastapi-blog-api/seed_data.py new file mode 100644 index 0000000..e155338 --- /dev/null +++ b/test-apps/03-fastapi-blog-api/seed_data.py @@ -0,0 +1,130 @@ +""" +Seed data for FastAPI Blog API. +Loads sample users, posts, and comments for testing/demo. +Identical dataset to the Flask test-app for easy comparison. +""" +from datetime import datetime, timezone +from models import PostStatus + + +def load_seed_data(storage, rbac, auth_manager) -> None: + """Populate storage and RBAC with demo data.""" + + print("Loading seed data...") + + # ------------------------------------------------------------------ + # Users + # ------------------------------------------------------------------ + users_data = [ + {"username": "admin", "email": "admin@blogapi.com", "password": "admin123", "role": "admin"}, + {"username": "editor", "email": "editor@blogapi.com", "password": "editor123", "role": "editor"}, + {"username": "john_author", "email": "john@blogapi.com", "password": "author123", "role": "author"}, + {"username": "jane_author", "email": "jane@blogapi.com", "password": "author123", "role": "author"}, + {"username": "bob_reader", "email": "bob@blogapi.com", "password": "reader123", "role": "reader"}, + ] + + created_users: dict = {} + for ud in users_data: + pw_hash = auth_manager.hash_password(ud["password"]) + user = storage.create_user( + username=ud["username"], + email=ud["email"], + password_hash=pw_hash, + role=ud["role"], + ) + created_users[ud["username"]] = user + + rbac.create_user(user_id=f"user_{user.id}", email=user.email, name=user.username) + rbac.assign_role(f"user_{user.id}", f"role_{user.role}") + print(f" ✓ Created user: {user.username} ({user.role})") + + # ------------------------------------------------------------------ + # Posts + # ------------------------------------------------------------------ + posts_data = [ + { + "title": "Getting Started with RBAC", + "content": "Role-Based Access Control (RBAC) is a powerful authorization model. " + "In this post we explore the fundamentals and key concepts including " + "Users, Roles, Permissions, and Resources.", + "author": "john_author", + "status": PostStatus.PUBLISHED, + "tags": ["rbac", "security", "tutorial"], + }, + { + "title": "Building REST APIs with FastAPI", + "content": "FastAPI is a modern, high-performance Python framework for building APIs. " + "It features automatic OpenAPI docs, Pydantic models, and native async support.", + "author": "john_author", + "status": PostStatus.PUBLISHED, + "tags": ["fastapi", "python", "api"], + }, + { + "title": "Draft: Advanced RBAC Patterns", + "content": "This draft explores attribute-based access control (ABAC), " + "contextual permissions, and hierarchical role designs.", + "author": "john_author", + "status": PostStatus.DRAFT, + "tags": ["rbac", "abac", "advanced"], + }, + { + "title": "Managing Blog Content at Scale", + "content": "As your blog grows, managing content, authors, and editorial workflows " + "becomes critical. Here are best practices for editorial teams.", + "author": "jane_author", + "status": PostStatus.PUBLISHED, + "tags": ["content", "management"], + }, + { + "title": "Security Best Practices for Web APIs", + "content": "Securing your API requires more than just authentication. " + "We cover authorization, rate limiting, input validation, and audit logging.", + "author": "editor", + "status": PostStatus.PUBLISHED, + "tags": ["security", "api", "best-practices"], + }, + ] + + created_posts: dict = {} + for pd in posts_data: + author = created_users[pd["author"]] + post = storage.create_post( + title=pd["title"], + content=pd["content"], + author_id=author.id, + author_username=author.username, + status=pd["status"], + tags=pd["tags"], + ) + created_posts[pd["title"]] = post + print(f" ✓ Created post: '{post.title}' by {author.username}") + + # ------------------------------------------------------------------ + # Comments + # ------------------------------------------------------------------ + comments_data = [ + {"post": "Getting Started with RBAC", "author": "bob_reader", "content": "Great intro to RBAC!"}, + {"post": "Getting Started with RBAC", "author": "jane_author", "content": "Really well explained."}, + {"post": "Building REST APIs with FastAPI","author": "bob_reader", "content": "FastAPI is amazing, thanks!"}, + {"post": "Building REST APIs with FastAPI","author": "editor", "content": "Good overview of FastAPI."}, + {"post": "Managing Blog Content at Scale","author": "john_author","content": "Very useful editorial tips."}, + {"post": "Security Best Practices for Web APIs","author": "john_author","content": "Great coverage of auth patterns."}, + {"post": "Security Best Practices for Web APIs","author": "bob_reader", "content": "Can you cover OAuth next?"}, + ] + + for cd in comments_data: + post = created_posts.get(cd["post"]) + author = created_users[cd["author"]] + if post: + storage.create_comment( + post_id=post.id, + content=cd["content"], + author_id=author.id, + author_username=author.username, + ) + + stats = storage.get_stats() + print(f"\nSeed data loaded successfully!") + print(f" Users: {stats.total_users}") + print(f" Posts: {stats.total_posts} ({stats.published_posts} published)") + print(f" Comments: {stats.total_comments}") diff --git a/test-apps/03-fastapi-blog-api/storage.py b/test-apps/03-fastapi-blog-api/storage.py new file mode 100644 index 0000000..e5d6b4a --- /dev/null +++ b/test-apps/03-fastapi-blog-api/storage.py @@ -0,0 +1,208 @@ +""" +In-memory storage for FastAPI Blog API. +Provides CRUD for users, posts, comments, and stats. +Mirrors the Flask test-app storage exactly. +""" +from typing import Dict, List, Optional +from datetime import datetime, timezone + +from models import User, Post, Comment, PostStatus, SystemStats + + +class InMemoryStorage: + """In-memory storage for blog data.""" + + def __init__(self) -> None: + self.users: Dict[str, User] = {} + self.posts: Dict[str, Post] = {} + self.comments: Dict[str, Comment] = {} + self._next_user_id = 1 + self._next_post_id = 1 + self._next_comment_id = 1 + + # ------------------------------------------------------------------ + # Users + # ------------------------------------------------------------------ + + def create_user( + self, username: str, email: str, password_hash: str, role: str + ) -> User: + user_id = str(self._next_user_id) + self._next_user_id += 1 + user = User( + id=user_id, + username=username, + email=email, + password_hash=password_hash, + role=role, + ) + self.users[user_id] = user + return user + + def get_user(self, user_id: str) -> Optional[User]: + return self.users.get(user_id) + + def get_user_by_username(self, username: str) -> Optional[User]: + for user in self.users.values(): + if user.username == username: + return user + return None + + def get_user_by_email(self, email: str) -> Optional[User]: + for user in self.users.values(): + if user.email == email: + return user + return None + + def list_users(self) -> List[User]: + return list(self.users.values()) + + def update_user_role(self, user_id: str, new_role: str) -> Optional[User]: + user = self.users.get(user_id) + if user: + user.role = new_role + return user + + def delete_user(self, user_id: str) -> bool: + if user_id in self.users: + del self.users[user_id] + return True + return False + + # ------------------------------------------------------------------ + # Posts + # ------------------------------------------------------------------ + + def create_post( + self, + title: str, + content: str, + author_id: str, + author_username: str, + status: PostStatus = PostStatus.DRAFT, + tags: Optional[List[str]] = None, + ) -> Post: + post_id = str(self._next_post_id) + self._next_post_id += 1 + now = datetime.now(timezone.utc) + post = Post( + id=post_id, + title=title, + content=content, + author_id=author_id, + author_username=author_username, + status=status, + tags=tags or [], + created_at=now, + updated_at=now, + published_at=now if status == PostStatus.PUBLISHED else None, + ) + self.posts[post_id] = post + return post + + def get_post(self, post_id: str) -> Optional[Post]: + return self.posts.get(post_id) + + def list_posts( + self, + status: Optional[PostStatus] = None, + author_id: Optional[str] = None, + ) -> List[Post]: + posts = list(self.posts.values()) + if status is not None: + posts = [p for p in posts if p.status == status] + if author_id is not None: + posts = [p for p in posts if p.author_id == author_id] + return sorted(posts, key=lambda p: p.created_at, reverse=True) + + def update_post( + self, + post_id: str, + title: Optional[str] = None, + content: Optional[str] = None, + status: Optional[PostStatus] = None, + tags: Optional[List[str]] = None, + ) -> Optional[Post]: + post = self.posts.get(post_id) + if not post: + return None + if title is not None: + post.title = title + if content is not None: + post.content = content + if status is not None: + post.status = status + if status == PostStatus.PUBLISHED and post.published_at is None: + post.published_at = datetime.now(timezone.utc) + if tags is not None: + post.tags = tags + post.updated_at = datetime.now(timezone.utc) + return post + + def delete_post(self, post_id: str) -> bool: + if post_id in self.posts: + del self.posts[post_id] + return True + return False + + # ------------------------------------------------------------------ + # Comments + # ------------------------------------------------------------------ + + def create_comment( + self, + post_id: str, + content: str, + author_id: str, + author_username: str, + ) -> Optional[Comment]: + if post_id not in self.posts: + return None + comment_id = str(self._next_comment_id) + self._next_comment_id += 1 + now = datetime.now(timezone.utc) + comment = Comment( + id=comment_id, + post_id=post_id, + content=content, + author_id=author_id, + author_username=author_username, + created_at=now, + updated_at=now, + ) + self.comments[comment_id] = comment + return comment + + def get_comment(self, comment_id: str) -> Optional[Comment]: + return self.comments.get(comment_id) + + def list_comments(self, post_id: str) -> List[Comment]: + return [ + c for c in self.comments.values() + if c.post_id == post_id and not c.is_deleted + ] + + def delete_comment(self, comment_id: str, soft: bool = True) -> bool: + comment = self.comments.get(comment_id) + if not comment: + return False + if soft: + comment.is_deleted = True + else: + del self.comments[comment_id] + return True + + # ------------------------------------------------------------------ + # Stats + # ------------------------------------------------------------------ + + def get_stats(self) -> SystemStats: + published = sum(1 for p in self.posts.values() if p.status == PostStatus.PUBLISHED) + draft = sum(1 for p in self.posts.values() if p.status == PostStatus.DRAFT) + return SystemStats( + total_users=len(self.users), + total_posts=len(self.posts), + total_comments=sum(1 for c in self.comments.values() if not c.is_deleted), + published_posts=published, + draft_posts=draft, + ) diff --git a/test-apps/03-fastapi-blog-api/test_api.py b/test-apps/03-fastapi-blog-api/test_api.py new file mode 100644 index 0000000..271ab6d --- /dev/null +++ b/test-apps/03-fastapi-blog-api/test_api.py @@ -0,0 +1,417 @@ +""" +FastAPI Blog API — Integration Tests +====================================== +All tests use FastAPI's built-in TestClient (synchronous), which is backed by +httpx and does NOT require a running server. + +Coverage mirrors the Flask test suite (test-apps/02-flask-blog-api/test_api.py). +Tests are grouped by feature domain and run fully in-memory. +""" +import sys +import os +import pytest + +# Make sure we can import the app from this directory +sys.path.insert(0, os.path.dirname(__file__)) +# Allow discovery of the RBAC library from ../../src +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "src")) + +from fastapi.testclient import TestClient +from app import create_app + + +# ============================================================================ +# Fixtures +# ============================================================================ + +@pytest.fixture(scope="module") +def app(): + """Create a test app (no seed data, clean in-memory state).""" + return create_app(testing=True) + + +@pytest.fixture(scope="module") +def client(app): + """Return a TestClient bound to the test app.""" + with TestClient(app) as c: + yield c + + +@pytest.fixture(scope="module") +def registered_users(client): + """Register all personas and return {username: token, ...}.""" + credentials = { + "admin": {"username": "test_admin", "email": "tadmin@test.com", "password": "Admin1234!"}, + "editor": {"username": "test_editor", "email": "teditor@test.com", "password": "Editor123!"}, + "author1": {"username": "test_author1", "email": "tauth1@test.com", "password": "Author123!"}, + "author2": {"username": "test_author2", "email": "tauth2@test.com", "password": "Author456!"}, + "reader": {"username": "test_reader", "email": "treader@test.com", "password": "Reader123!"}, + } + tokens = {} + user_ids = {} + for key, creds in credentials.items(): + r = client.post("/auth/register", json=creds) + assert r.status_code == 201, r.json() + data = r.json() + tokens[key] = data["access_token"] + user_ids[key] = data["user"]["id"] + + # Promote admin, editor via direct storage (test-only shortcut) + storage = client.app.state.storage + rbac = client.app.state.rbac + for key, role in [("admin", "admin"), ("editor", "editor"), ("author1", "author"), ("author2", "author")]: + uid = user_ids[key] + storage.update_user_role(uid, role) + rbac_uid = f"user_{uid}" + try: + rbac.revoke_role(rbac_uid, "role_reader") + except Exception: + pass + rbac.assign_role(rbac_uid, f"role_{role}") + + # Re-login to get fresh tokens with updated roles in the JWT + credentials_map = { + "admin": ("test_admin", "Admin1234!"), + "editor": ("test_editor", "Editor123!"), + "author1": ("test_author1", "Author123!"), + "author2": ("test_author2", "Author456!"), + "reader": ("test_reader", "Reader123!"), + } + for key, (username, password) in credentials_map.items(): + r = client.post("/auth/login", json={"username": username, "password": password}) + assert r.status_code == 200, r.json() + tokens[key] = r.json()["access_token"] + + return {"tokens": tokens, "user_ids": user_ids} + + +def auth_header(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +# ============================================================================ +# Health & Root +# ============================================================================ + +class TestHealthAndRoot: + def test_health(self, client): + r = client.get("/health") + assert r.status_code == 200 + body = r.json() + assert body["status"] == "healthy" + assert "service" in body + + def test_root(self, client): + r = client.get("/") + assert r.status_code == 200 + body = r.json() + assert "endpoints" in body + + +# ============================================================================ +# Authentication +# ============================================================================ + +class TestAuth: + def test_register_success(self, client): + r = client.post("/auth/register", json={ + "username": "newuser_auth", + "email": "newuser_auth@test.com", + "password": "NewPass123!", + }) + assert r.status_code == 201 + body = r.json() + assert "access_token" in body + assert body["user"]["username"] == "newuser_auth" + assert body["user"]["role"] == "reader" + + def test_register_duplicate_username(self, client, registered_users): + token = registered_users["tokens"]["reader"] + # reader already registered; try same username + r = client.post("/auth/register", json={ + "username": "test_reader", + "email": "other@test.com", + "password": "Pass1234!", + }) + assert r.status_code == 409 + + def test_register_duplicate_email(self, client, registered_users): + r = client.post("/auth/register", json={ + "username": "different_name", + "email": "treader@test.com", + "password": "Pass1234!", + }) + assert r.status_code == 409 + + def test_login_success(self, client): + client.post("/auth/register", json={ + "username": "login_test_user", + "email": "logintest@test.com", + "password": "LoginPass1!", + }) + r = client.post("/auth/login", json={"username": "login_test_user", "password": "LoginPass1!"}) + assert r.status_code == 200 + body = r.json() + assert "access_token" in body + + def test_login_wrong_password(self, client, registered_users): + r = client.post("/auth/login", json={"username": "test_reader", "password": "wrongpassword"}) + assert r.status_code == 401 + + def test_login_nonexistent_user(self, client): + r = client.post("/auth/login", json={"username": "ghost_user", "password": "whatever"}) + assert r.status_code == 401 + + def test_get_me(self, client, registered_users): + token = registered_users["tokens"]["reader"] + r = client.get("/auth/me", headers=auth_header(token)) + assert r.status_code == 200 + assert r.json()["username"] == "test_reader" + + def test_get_me_unauthenticated(self, client): + r = client.get("/auth/me") + assert r.status_code == 401 + + def test_get_me_bad_token(self, client): + r = client.get("/auth/me", headers={"Authorization": "Bearer this.is.not.valid"}) + assert r.status_code == 401 + + +# ============================================================================ +# Posts +# ============================================================================ + +class TestPosts: + @pytest.fixture(scope="class") + def post_id(self, client, registered_users): + """Create one post as author1 and return its ID.""" + token = registered_users["tokens"]["author1"] + r = client.post("/posts", json={ + "title": "RBAC Deep Dive", + "content": "A comprehensive look at role-based access control.", + "tags": ["rbac", "security"], + }, headers=auth_header(token)) + assert r.status_code == 201, r.json() + return r.json()["id"] + + def test_list_posts_unauthenticated_sees_published_only(self, client, registered_users): + # Publish one post as editor so there is something visible + tok = registered_users["tokens"]["editor"] + r = client.post("/posts", json={ + "title": "Published by Editor", + "content": "Editor content", + "status": "published", + }, headers=auth_header(tok)) + assert r.status_code == 201 + + posts = client.get("/posts").json()["posts"] + for p in posts: + assert p["status"] == "published" + + def test_create_post_as_author(self, client, registered_users, post_id): + assert post_id is not None + + def test_create_post_as_reader_forbidden(self, client, registered_users): + token = registered_users["tokens"]["reader"] + r = client.post("/posts", json={ + "title": "Reader Attempting Post", + "content": "Should fail.", + }, headers=auth_header(token)) + assert r.status_code == 403 + + def test_create_post_unauthenticated(self, client): + r = client.post("/posts", json={"title": "Anon", "content": "Anon"}) + assert r.status_code == 401 + + def test_get_post(self, client, registered_users): + # Create and publish a post + tok = registered_users["tokens"]["author1"] + r = client.post("/posts", json={ + "title": "Visible Post", + "content": "Some content", + "status": "published", + }, headers=auth_header(tok)) + pid = r.json()["id"] + r2 = client.get(f"/posts/{pid}") + assert r2.status_code == 200 + assert r2.json()["id"] == pid + + def test_get_nonexistent_post(self, client): + r = client.get("/posts/nonexistent_id_xyz") + assert r.status_code == 404 + + def test_update_own_post(self, client, registered_users, post_id): + token = registered_users["tokens"]["author1"] + r = client.put(f"/posts/{post_id}", json={"title": "Updated Title"}, headers=auth_header(token)) + assert r.status_code == 200 + assert r.json()["title"] == "Updated Title" + + def test_update_others_post_as_reader_forbidden(self, client, registered_users, post_id): + token = registered_users["tokens"]["reader"] + r = client.put(f"/posts/{post_id}", json={"title": "Hacked"}, headers=auth_header(token)) + assert r.status_code == 403 + + def test_admin_can_update_any_post(self, client, registered_users, post_id): + token = registered_users["tokens"]["admin"] + r = client.put(f"/posts/{post_id}", json={"title": "Admin Updated"}, headers=auth_header(token)) + assert r.status_code == 200 + + def test_publish_post_as_editor(self, client, registered_users, post_id): + tok = registered_users["tokens"]["editor"] + r = client.put(f"/posts/{post_id}/publish", headers=auth_header(tok)) + assert r.status_code == 200 + + def test_publish_post_as_reader_forbidden(self, client, registered_users, post_id): + tok = registered_users["tokens"]["reader"] + r = client.put(f"/posts/{post_id}/publish", headers=auth_header(tok)) + assert r.status_code == 403 + + def test_delete_others_post_as_author_forbidden(self, client, registered_users, post_id): + tok = registered_users["tokens"]["author2"] + r = client.delete(f"/posts/{post_id}", headers=auth_header(tok)) + assert r.status_code == 403 + + def test_delete_own_post(self, client, registered_users): + tok = registered_users["tokens"]["author1"] + r = client.post("/posts", json={"title": "To Delete", "content": "bye"}, headers=auth_header(tok)) + pid = r.json()["id"] + r2 = client.delete(f"/posts/{pid}", headers=auth_header(tok)) + assert r2.status_code == 200 + + +# ============================================================================ +# Comments +# ============================================================================ + +class TestComments: + @pytest.fixture(scope="class") + def published_post_id(self, client, registered_users): + tok = registered_users["tokens"]["author1"] + r = client.post("/posts", json={ + "title": "Post With Comments", + "content": "Come comment here", + "status": "published", + }, headers=auth_header(tok)) + assert r.status_code == 201 + return r.json()["id"] + + def test_add_comment_as_reader(self, client, registered_users, published_post_id): + tok = registered_users["tokens"]["reader"] + r = client.post(f"/posts/{published_post_id}/comments", + json={"content": "Great post!"}, + headers=auth_header(tok)) + assert r.status_code == 201 + assert r.json()["content"] == "Great post!" + + def test_add_comment_unauthenticated(self, client, published_post_id): + r = client.post(f"/posts/{published_post_id}/comments", json={"content": "Anon comment"}) + assert r.status_code == 401 + + def test_get_comments(self, client, published_post_id): + r = client.get(f"/posts/{published_post_id}/comments") + assert r.status_code == 200 + body = r.json() + assert "comments" in body + assert body["total"] >= 1 + + def test_delete_own_comment(self, client, registered_users, published_post_id): + tok = registered_users["tokens"]["reader"] + r = client.post(f"/posts/{published_post_id}/comments", + json={"content": "Will delete this"}, + headers=auth_header(tok)) + cid = r.json()["id"] + r2 = client.delete(f"/comments/{cid}", headers=auth_header(tok)) + assert r2.status_code == 200 + + def test_delete_others_comment_as_reader_forbidden(self, client, registered_users, published_post_id): + # author1 adds a comment + tok_author = registered_users["tokens"]["author1"] + r = client.post(f"/posts/{published_post_id}/comments", + json={"content": "Author comment"}, + headers=auth_header(tok_author)) + cid = r.json()["id"] + # reader tries to delete it + tok_reader = registered_users["tokens"]["reader"] + r2 = client.delete(f"/comments/{cid}", headers=auth_header(tok_reader)) + assert r2.status_code == 403 + + def test_admin_can_delete_any_comment(self, client, registered_users, published_post_id): + tok_reader = registered_users["tokens"]["reader"] + r = client.post(f"/posts/{published_post_id}/comments", + json={"content": "To be nuked by admin"}, + headers=auth_header(tok_reader)) + cid = r.json()["id"] + tok_admin = registered_users["tokens"]["admin"] + r2 = client.delete(f"/comments/{cid}", headers=auth_header(tok_admin)) + assert r2.status_code == 200 + + +# ============================================================================ +# Admin +# ============================================================================ + +class TestAdmin: + def test_list_users_as_admin(self, client, registered_users): + tok = registered_users["tokens"]["admin"] + r = client.get("/admin/users", headers=auth_header(tok)) + assert r.status_code == 200 + body = r.json() + assert "users" in body + assert body["total"] >= 1 + + def test_list_users_as_reader_forbidden(self, client, registered_users): + tok = registered_users["tokens"]["reader"] + r = client.get("/admin/users", headers=auth_header(tok)) + assert r.status_code == 403 + + def test_list_users_unauthenticated(self, client): + r = client.get("/admin/users") + assert r.status_code == 401 + + def test_update_user_role_as_admin(self, client, registered_users): + tok = registered_users["tokens"]["admin"] + uid = registered_users["user_ids"]["reader"] + r = client.put(f"/admin/users/{uid}/role", + json={"role": "author"}, + headers=auth_header(tok)) + assert r.status_code == 200 + assert r.json()["role"] == "author" + # Restore + client.put(f"/admin/users/{uid}/role", + json={"role": "reader"}, + headers=auth_header(tok)) + + def test_update_own_role_forbidden(self, client, registered_users): + tok = registered_users["tokens"]["admin"] + uid = registered_users["user_ids"]["admin"] + r = client.put(f"/admin/users/{uid}/role", + json={"role": "reader"}, + headers=auth_header(tok)) + assert r.status_code == 400 + + def test_update_role_invalid_value(self, client, registered_users): + tok = registered_users["tokens"]["admin"] + uid = registered_users["user_ids"]["reader"] + r = client.put(f"/admin/users/{uid}/role", + json={"role": "superuser"}, + headers=auth_header(tok)) + # Pydantic validation returns 422 Unprocessable Entity for invalid enum values + assert r.status_code in (400, 422) + + def test_get_stats_as_admin(self, client, registered_users): + tok = registered_users["tokens"]["admin"] + r = client.get("/admin/stats", headers=auth_header(tok)) + assert r.status_code == 200 + body = r.json() + assert "total_users" in body + assert "total_posts" in body + + def test_get_stats_as_editor(self, client, registered_users): + tok = registered_users["tokens"]["editor"] + r = client.get("/admin/stats", headers=auth_header(tok)) + assert r.status_code == 200 + + def test_get_stats_as_reader_forbidden(self, client, registered_users): + tok = registered_users["tokens"]["reader"] + r = client.get("/admin/stats", headers=auth_header(tok)) + assert r.status_code == 403