diff --git a/src/rbac/core/models/__init__.py b/src/rbac/core/models/__init__.py index ba76a1a..712b0c4 100644 --- a/src/rbac/core/models/__init__.py +++ b/src/rbac/core/models/__init__.py @@ -14,6 +14,11 @@ import json +def _utcnow() -> datetime: + """Timezone-aware UTC now — used as a default_factory in dataclasses.""" + return datetime.now(timezone.utc) + + class EntityStatus(Enum): """Status of an entity in the system.""" ACTIVE = "active" @@ -55,8 +60,8 @@ class User: attributes: Dict[str, Any] = field(default_factory=dict) status: EntityStatus = EntityStatus.ACTIVE domain: Optional[str] = None - created_at: datetime = field(default_factory=datetime.utcnow) - updated_at: datetime = field(default_factory=datetime.utcnow) + created_at: datetime = field(default_factory=_utcnow) + updated_at: datetime = field(default_factory=_utcnow) def __post_init__(self): """Validate user data after initialization.""" @@ -167,7 +172,7 @@ class Permission: action: str description: Optional[str] = None conditions: Dict[str, Any] = field(default_factory=dict) - created_at: datetime = field(default_factory=datetime.utcnow) + created_at: datetime = field(default_factory=_utcnow) def __post_init__(self): """Validate permission data.""" @@ -286,8 +291,8 @@ class Resource: parent_id: Optional[str] = None status: EntityStatus = EntityStatus.ACTIVE domain: Optional[str] = None - created_at: datetime = field(default_factory=datetime.utcnow) - updated_at: datetime = field(default_factory=datetime.utcnow) + created_at: datetime = field(default_factory=_utcnow) + updated_at: datetime = field(default_factory=_utcnow) def __post_init__(self): """Validate resource data.""" diff --git a/src/rbac/storage/sqlalchemy_adapter.py b/src/rbac/storage/sqlalchemy_adapter.py index 585cba3..7e0ad38 100644 --- a/src/rbac/storage/sqlalchemy_adapter.py +++ b/src/rbac/storage/sqlalchemy_adapter.py @@ -75,6 +75,10 @@ # ORM declarations # --------------------------------------------------------------------------- +# FK target constant – avoids repeating the table.column string literal +_FK_ROLES_ID = "rbac_roles.id" + + class _Base(DeclarativeBase): """Shared declarative base for all RBAC ORM models.""" @@ -83,7 +87,7 @@ class _Base(DeclarativeBase): _role_permissions = Table( "rbac_role_permissions", _Base.metadata, - Column("role_id", String(255), ForeignKey("rbac_roles.id", ondelete="CASCADE"), nullable=False), + Column("role_id", String(255), ForeignKey(_FK_ROLES_ID, ondelete="CASCADE"), nullable=False), Column("permission_id", String(255), ForeignKey("rbac_permissions.id", ondelete="CASCADE"), nullable=False), UniqueConstraint("role_id", "permission_id", name="uq_role_permission"), ) @@ -119,7 +123,7 @@ class _RoleRow(_Base): id = Column(String(255), primary_key=True) name = Column(String(255), nullable=False) description = Column(Text, nullable=True) - parent_id = Column(String(255), ForeignKey("rbac_roles.id", ondelete="SET NULL"), nullable=True) + parent_id = Column(String(255), ForeignKey(_FK_ROLES_ID, ondelete="SET NULL"), nullable=True) domain = Column(String(255), nullable=True) status = Column(String(50), nullable=False, default="active") metadata_json = Column(Text, nullable=False, default="{}") @@ -175,7 +179,7 @@ class _RoleAssignmentRow(_Base): id = Column(String(255), primary_key=True) # composite surrogate user_id = Column(String(255), ForeignKey("rbac_users.id", ondelete="CASCADE"), nullable=False) - role_id = Column(String(255), ForeignKey("rbac_roles.id", ondelete="CASCADE"), nullable=False) + role_id = Column(String(255), ForeignKey(_FK_ROLES_ID, ondelete="CASCADE"), nullable=False) domain = Column(String(255), nullable=True) granted_by = Column(String(255), nullable=True) granted_at = Column(DateTime(timezone=True), nullable=False) @@ -398,7 +402,7 @@ def __init__( kwargs["connect_args"] = {"check_same_thread": False} self._engine = create_engine(database_url, **kwargs) - self._Session = sessionmaker(bind=self._engine, expire_on_commit=False) + self._session_factory = sessionmaker(bind=self._engine, expire_on_commit=False) # ------------------------------------------------------------------ # Lifecycle @@ -433,7 +437,7 @@ def _validate_role_assignment(self, assignment: RoleAssignment) -> None: @contextmanager def _session(self) -> Generator[Session, None, None]: """Provide a transactional session scope.""" - session: Session = self._Session() + session: Session = self._session_factory() try: yield session session.commit() diff --git a/test-apps/02-flask-blog-api/app.py b/test-apps/02-flask-blog-api/app.py index 302ed24..72647dd 100644 --- a/test-apps/02-flask-blog-api/app.py +++ b/test-apps/02-flask-blog-api/app.py @@ -1,17 +1,18 @@ -""" +""" Flask Blog API - Main Application A complete REST API demonstrating RBAC Algorithm integration. """ -from flask import Flask, jsonify, request, g -from flask_cors import CORS from datetime import datetime, timezone +from flask import Blueprint, Flask, current_app, g, jsonify, request +from flask_cors import CORS + # Local imports +from auth import AuthManager, optional_auth, require_auth from config import get_config -from auth import AuthManager, require_auth, optional_auth -from decorators import require_permission, require_role, require_admin -from storage import InMemoryStorage +from decorators import require_permission, require_admin from models import PostStatus +from storage import InMemoryStorage # RBAC imports from rbac import RBAC @@ -23,555 +24,426 @@ MSG_AUTH_REQUIRED = 'Authentication required' MSG_LOGIN_REQUIRED = 'You must be logged in to perform this action' +# --------------------------------------------------------------------------- +# Blueprints +# --------------------------------------------------------------------------- +main_bp = Blueprint('main', __name__) +auth_bp = Blueprint('auth', __name__, url_prefix='/auth') +posts_bp = Blueprint('posts', __name__, url_prefix='/posts') +comments_bp = Blueprint('comments', __name__, url_prefix='/comments') +admin_bp = Blueprint('admin', __name__, url_prefix='/admin') + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- +VALID_ROLES = ['admin', 'editor', 'author', 'reader'] + + +def _not_found(msg='The requested resource was not found'): + return jsonify({'error': ERROR_NOT_FOUND, 'message': msg}), 404 + + +def _validation_error(msg): + return jsonify({'error': ERROR_VALIDATION, 'message': msg}), 400 + + +# --------------------------------------------------------------------------- +# Main routes +# --------------------------------------------------------------------------- + +@main_bp.route('/health', methods=['GET']) +def health_check(): + """Health check endpoint.""" + return jsonify({ + 'status': 'healthy', + 'timestamp': datetime.now(timezone.utc).isoformat(), + 'version': current_app.config['API_VERSION'], + }) + + +@main_bp.route('/', methods=['GET']) +def index(): + """API information.""" + return jsonify({ + 'name': current_app.config['API_TITLE'], + 'version': current_app.config['API_VERSION'], + 'description': current_app.config['API_DESCRIPTION'], + 'endpoints': { + 'auth': '/auth/*', + 'posts': '/posts', + 'comments': '/posts//comments', + 'admin': '/admin/*', + 'health': '/health', + }, + }) + + +# --------------------------------------------------------------------------- +# Auth routes +# --------------------------------------------------------------------------- + +@auth_bp.route('/register', methods=['POST']) +def register(): + """Register a new user.""" + data = request.get_json() + username = data.get('username', '').strip() + email = data.get('email', '').strip() + password = data.get('password', '') + role = data.get('role', 'reader').lower() + + if not username or not email or not password: + return _validation_error('Username, email, and password are required') + + if role not in VALID_ROLES: + return _validation_error(f'Role must be one of: {", ".join(VALID_ROLES)}') + + storage = g.storage + if storage.get_user_by_username(username): + return jsonify({'error': 'Conflict', 'message': 'Username already exists'}), 409 + + if storage.get_user_by_email(email): + return jsonify({'error': 'Conflict', 'message': 'Email already exists'}), 409 + + auth_manager = g.auth_manager + password_hash = auth_manager.hash_password(password) + user = storage.create_user(username, email, password_hash, role) + + rbac = g.rbac + rbac.create_user(user_id=f"user_{user.id}", email=user.email, name=user.username) + rbac.assign_role(f"user_{user.id}", f"role_{role}") + + return jsonify({'message': 'User registered successfully', 'user': user.to_public_dict()}), 201 + + +@auth_bp.route('/login', methods=['POST']) +def login(): + """Login and get JWT token.""" + data = request.get_json() + username = data.get('username', '').strip() + password = data.get('password', '') + + if not username or not password: + return _validation_error('Username and password are required') + + storage = g.storage + auth_manager = g.auth_manager + user = storage.get_user_by_username(username) + + if not user or not auth_manager.verify_password(password, user.password_hash): + return jsonify({'error': 'Authentication failed', 'message': 'Invalid username or password'}), 401 + + token = auth_manager.generate_token(user.id, user.username, user.role) + return jsonify({ + 'token': token, + 'user': user.to_public_dict(), + 'expires_in': int(current_app.config['JWT_EXPIRATION'].total_seconds()), + }) + + +@auth_bp.route('/me', methods=['GET']) +@require_auth +def get_current_user_info(): + """Get current user info.""" + user = g.current_user + user_data = g.storage.get_user(user['user_id']) + if not user_data: + return _not_found('User not found') + return jsonify(user_data.to_dict()) + + +# --------------------------------------------------------------------------- +# Post routes +# --------------------------------------------------------------------------- + +@posts_bp.route('', methods=['GET']) +@optional_auth +def list_posts(): + """List all posts.""" + storage = g.storage + current_user = g.current_user + + if current_user: + own_posts = storage.list_posts(author_id=current_user['user_id']) + all_published = storage.list_posts(status=PostStatus.PUBLISHED) + seen = set() + posts = [] + for post in own_posts + all_published: + if post.id not in seen: + seen.add(post.id) + posts.append(post) + posts.sort(key=lambda p: p.created_at, reverse=True) + else: + posts = storage.list_posts(status=PostStatus.PUBLISHED) + + return jsonify({'posts': [p.to_summary_dict() for p in posts], 'count': len(posts)}) + + +@posts_bp.route('/', methods=['GET']) +@optional_auth +def get_post(post_id): + """Get a specific post.""" + post = g.storage.get_post(str(post_id)) + if not post: + return _not_found('Post not found') + + if post.status == PostStatus.PUBLISHED: + return jsonify(post.to_dict()) + + current_user = g.current_user + if not current_user or current_user['user_id'] != post.author_id: + return jsonify({'error': 'Forbidden', 'message': 'You do not have permission to view this post'}), 403 + + return jsonify(post.to_dict()) + + +@posts_bp.route('', methods=['POST']) +@require_auth +@require_permission('create', 'post') +def create_post(): + """Create a new post.""" + data = request.get_json() + user = g.current_user + title = data.get('title', '').strip() + content = data.get('content', '').strip() + status = data.get('status', 'draft').lower() + tags = data.get('tags', []) + + if not title or not content: + return _validation_error('Title and content are required') + + try: + status_enum = PostStatus(status) + except ValueError: + return _validation_error(f'Invalid status. Must be one of: {", ".join(s.value for s in PostStatus)}') + + post = g.storage.create_post( + title=title, + content=content, + author_id=user['user_id'], + author_username=user['username'], + status=status_enum, + tags=tags, + ) + return jsonify({'message': 'Post created successfully', 'post': post.to_dict()}), 201 + + +@posts_bp.route('/', methods=['PUT']) +@require_auth +@require_permission('update', 'post', check_ownership=True) +def update_post(post_id): + """Update a post.""" + data = request.get_json() + title = data.get('title') + content = data.get('content') + status = data.get('status') + tags = data.get('tags') + + status_enum = None + if status: + try: + status_enum = PostStatus(status.lower()) + except ValueError: + return _validation_error(f'Invalid status. Must be one of: {", ".join(s.value for s in PostStatus)}') + + updated_post = g.storage.update_post( + str(post_id), title=title, content=content, status=status_enum, tags=tags + ) + return jsonify({'message': 'Post updated successfully', 'post': updated_post.to_dict()}) + + +@posts_bp.route('/', methods=['DELETE']) +@require_auth +@require_permission('delete', 'post', check_ownership=True) +def delete_post(post_id): + """Delete a post.""" + if g.storage.delete_post(str(post_id)): + return jsonify({'message': 'Post deleted successfully'}) + return _not_found('Post not found') + + +@posts_bp.route('//publish', methods=['POST']) +@require_auth +@require_permission('publish', 'post') +def publish_post(post_id): + """Publish a post.""" + post = g.storage.get_post(str(post_id)) + if not post: + return _not_found('Post not found') + updated_post = g.storage.update_post(str(post_id), status=PostStatus.PUBLISHED) + return jsonify({'message': 'Post published successfully', 'post': updated_post.to_dict()}) + + +@posts_bp.route('//comments', methods=['GET']) +def list_comments(post_id): + """List comments for a post.""" + if not g.storage.get_post(str(post_id)): + return _not_found('Post not found') + comments = g.storage.list_comments(str(post_id)) + return jsonify({'comments': [c.to_dict() for c in comments], 'count': len(comments)}) + + +@posts_bp.route('//comments', methods=['POST']) +@require_auth +@require_permission('create', 'comment') +def create_comment(post_id): + """Add a comment to a post.""" + data = request.get_json() + user = g.current_user + content = data.get('content', '').strip() + + if not content: + return _validation_error('Content is required') + + comment = g.storage.create_comment( + post_id=str(post_id), + content=content, + author_id=user['user_id'], + author_username=user['username'], + ) + if not comment: + return _not_found('Post not found') + return jsonify({'message': 'Comment created successfully', 'comment': comment.to_dict()}), 201 + + +# --------------------------------------------------------------------------- +# Comment routes +# --------------------------------------------------------------------------- + +@comments_bp.route('/', methods=['DELETE']) +@require_auth +@require_permission('delete', 'comment', check_ownership=True) +def delete_comment(comment_id): + """Delete a comment.""" + if g.storage.delete_comment(str(comment_id), soft=True): + return jsonify({'message': 'Comment deleted successfully'}) + return _not_found('Comment not found') + + +# --------------------------------------------------------------------------- +# Admin routes +# --------------------------------------------------------------------------- + +@admin_bp.route('/users', methods=['GET']) +@require_auth +@require_admin +def list_users(): + """List all users (admin only).""" + users = g.storage.list_users() + return jsonify({'users': [u.to_dict() for u in users], 'count': len(users)}) + + +@admin_bp.route('/users//role', methods=['PUT']) +@require_auth +@require_admin +def update_user_role(user_id): + """Update a user's role (admin only).""" + data = request.get_json() + new_role = data.get('role', '').lower() + + if new_role not in VALID_ROLES: + return _validation_error(f'Role must be one of: {", ".join(VALID_ROLES)}') + + user = g.storage.update_user_role(str(user_id), new_role) + if not user: + return _not_found('User not found') + + rbac = g.rbac + rbac_user_id = f"user_{user_id}" + try: + rbac.assign_role(rbac_user_id, f"role_{new_role}") + except Exception: + try: + rbac.create_user(user_id=rbac_user_id, email=user.email, name=user.username) + rbac.assign_role(rbac_user_id, f"role_{new_role}") + except Exception: + pass # User might already exist with the role + + return jsonify({'message': 'User role updated successfully', 'user': user.to_dict()}) + + +@admin_bp.route('/stats', methods=['GET']) +@require_auth +@require_admin +def get_stats(): + """Get system statistics (admin only).""" + return jsonify(g.storage.get_stats().to_dict()) + + +# --------------------------------------------------------------------------- +# Application factory +# --------------------------------------------------------------------------- def create_app(config_name='development'): """Application factory.""" app = Flask(__name__) app.config.from_object(get_config(config_name)) - - # Enable CORS + CORS(app, origins=app.config['CORS_ORIGINS']) - - # Initialize storage + storage = InMemoryStorage() - - # Initialize RBAC (using in-memory storage) rbac = RBAC(storage='memory') - - # Initialize auth manager auth_manager = AuthManager(app.config) - - # Store instances for access in routes + + # Store on app for test-client access app.storage = storage app.rbac = rbac app.auth_manager = auth_manager - - # Setup RBAC roles and permissions + setup_rbac(rbac) - - # Load seed data + from seed_data import load_seed_data load_seed_data(storage, rbac, auth_manager) - - # ==================== Before Request ==================== - + @app.before_request - def before_request(): - """Set up request context.""" + def _push_to_g(): g.storage = storage g.rbac = rbac g.auth_manager = auth_manager - - # ==================== Error Handlers ==================== - + @app.errorhandler(404) - def not_found(error): - return jsonify({ - 'error': ERROR_NOT_FOUND, - 'message': 'The requested resource was not found' - }), 404 - + def _not_found_handler(error): # noqa: ARG001 + return jsonify({'error': ERROR_NOT_FOUND, 'message': 'The requested resource was not found'}), 404 + @app.errorhandler(500) - def internal_error(error): - return jsonify({ - 'error': 'Internal server error', - 'message': 'An unexpected error occurred' - }), 500 - - # ==================== Health Check ==================== - - @app.route('/health', methods=['GET']) - def health_check(): - """Health check endpoint.""" - return jsonify({ - 'status': 'healthy', - 'timestamp': datetime.utcnow().isoformat(), - 'version': app.config['API_VERSION'] - }) - - @app.route('/', methods=['GET']) - def index(): - """API information.""" - return jsonify({ - 'name': app.config['API_TITLE'], - 'version': app.config['API_VERSION'], - 'description': app.config['API_DESCRIPTION'], - 'endpoints': { - 'auth': '/auth/*', - 'posts': '/posts', - 'comments': '/posts//comments', - 'admin': '/admin/*', - 'health': '/health' - } - }) - - # ==================== Authentication Routes ==================== - - @app.route('/auth/register', methods=['POST']) - def register(): - """Register a new user.""" - data = request.get_json() - - # Validate input - username = data.get('username', '').strip() - email = data.get('email', '').strip() - password = data.get('password', '') - role = data.get('role', 'reader').lower() - - if not username or not email or not password: - return jsonify({ - 'error': 'Validation error', - 'message': 'Username, email, and password are required' - }), 400 - - # Validate role - valid_roles = ['admin', 'editor', 'author', 'reader'] - if role not in valid_roles: - return jsonify({ - 'error': 'Validation error', - 'message': f'Role must be one of: {", ".join(valid_roles)}' - }), 400 - - # Check if username or email already exists - if storage.get_user_by_username(username): - return jsonify({ - 'error': 'Conflict', - 'message': 'Username already exists' - }), 409 - - if storage.get_user_by_email(email): - return jsonify({ - 'error': 'Conflict', - 'message': 'Email already exists' - }), 409 - - # Create user - password_hash = auth_manager.hash_password(password) - user = storage.create_user(username, email, password_hash, role) - - # Add user to RBAC system - rbac.create_user( - user_id=f"user_{user.id}", - email=user.email, - name=user.username - ) - rbac.assign_role(f"user_{user.id}", f"role_{role}") - - return jsonify({ - 'message': 'User registered successfully', - 'user': user.to_public_dict() - }), 201 - - @app.route('/auth/login', methods=['POST']) - def login(): - """Login and get JWT token.""" - data = request.get_json() - - username = data.get('username', '').strip() - password = data.get('password', '') - - if not username or not password: - return jsonify({ - 'error': 'Validation error', - 'message': 'Username and password are required' - }), 400 - - # Get user - user = storage.get_user_by_username(username) - - if not user or not auth_manager.verify_password(password, user.password_hash): - return jsonify({ - 'error': 'Authentication failed', - 'message': 'Invalid username or password' - }), 401 - - # Generate token - token = auth_manager.generate_token(user.id, user.username, user.role) - - return jsonify({ - 'token': token, - 'user': user.to_public_dict(), - 'expires_in': int(app.config['JWT_EXPIRATION'].total_seconds()) - }) - - @app.route('/auth/me', methods=['GET']) - @require_auth - def get_current_user(): - """Get current user info.""" - user = g.current_user - user_data = storage.get_user(user['user_id']) - - if not user_data: - return jsonify({ - 'error': ERROR_NOT_FOUND, - 'message': 'User not found' - }), 404 - - return jsonify(user_data.to_dict()) - - # ==================== Post Routes ==================== - - @app.route('/posts', methods=['GET']) - @optional_auth - def list_posts(): - """List all posts (public or filtered).""" - # Anonymous users see only published posts - # Authenticated users see all their posts + published posts from others - - current_user = g.current_user - - if current_user: - # Get user's own posts (all statuses) - own_posts = storage.list_posts(author_id=current_user['user_id']) - # Get other's published posts - all_posts = storage.list_posts(status=PostStatus.PUBLISHED) - - # Combine and deduplicate - post_ids = set() - posts = [] - for post in own_posts + all_posts: - if post.id not in post_ids: - post_ids.add(post.id) - posts.append(post) - - # Sort by created_at descending - posts.sort(key=lambda p: p.created_at, reverse=True) - else: - # Only published posts for anonymous users - posts = storage.list_posts(status=PostStatus.PUBLISHED) - - return jsonify({ - 'posts': [p.to_summary_dict() for p in posts], - 'count': len(posts) - }) - - @app.route('/posts/', methods=['GET']) - @optional_auth - def get_post(post_id): - """Get a specific post.""" - post = storage.get_post(str(post_id)) - - if not post: - return jsonify({ - 'error': 'Not found', - 'message': 'Post not found' - }), 404 - - # Check if user can view this post - current_user = g.current_user - - # Published posts are public - if post.status == PostStatus.PUBLISHED: - return jsonify(post.to_dict()) - - # Non-published posts require ownership - if not current_user or current_user['user_id'] != post.author_id: - return jsonify({ - 'error': 'Forbidden', - 'message': 'You do not have permission to view this post' - }), 403 - - return jsonify(post.to_dict()) - - @app.route('/posts', methods=['POST']) - @require_auth - @require_permission('create', 'post') - def create_post(): - """Create a new post.""" - data = request.get_json() - user = g.current_user - - title = data.get('title', '').strip() - content = data.get('content', '').strip() - status = data.get('status', 'draft').lower() - tags = data.get('tags', []) - - if not title or not content: - return jsonify({ - 'error': 'Validation error', - 'message': 'Title and content are required' - }), 400 - - # Validate status - try: - status_enum = PostStatus(status) - except ValueError: - return jsonify({ - 'error': 'Validation error', - 'message': f'Invalid status. Must be one of: {", ".join([s.value for s in PostStatus])}' - }), 400 - - # Create post - post = storage.create_post( - title=title, - content=content, - author_id=user['user_id'], - author_username=user['username'], - status=status_enum, - tags=tags - ) - - return jsonify({ - 'message': 'Post created successfully', - 'post': post.to_dict() - }), 201 - - @app.route('/posts/', methods=['PUT']) - @require_auth - @require_permission('update', 'post', check_ownership=True) - def update_post(post_id): - """Update a post (must be owner or editor/admin).""" - data = request.get_json() - # Resource validated by decorator - - title = data.get('title') - content = data.get('content') - status = data.get('status') - tags = data.get('tags') - - # Validate status if provided - status_enum = None - if status: - try: - status_enum = PostStatus(status.lower()) - except ValueError: - return jsonify({ - 'error': 'Validation error', - 'message': f'Invalid status. Must be one of: {", ".join([s.value for s in PostStatus])}' - }), 400 - - # Update post - updated_post = storage.update_post( - str(post_id), - title=title, - content=content, - status=status_enum, - tags=tags - ) - - return jsonify({ - 'message': 'Post updated successfully', - 'post': updated_post.to_dict() - }) - - @app.route('/posts/', methods=['DELETE']) - @require_auth - @require_permission('delete', 'post', check_ownership=True) - def delete_post(post_id): - """Delete a post (must be owner or editor/admin).""" - success = storage.delete_post(str(post_id)) - - if success: - return jsonify({ - 'message': 'Post deleted successfully' - }) - - return jsonify({ - 'error': 'Not found', - 'message': 'Post not found' - }), 404 - - @app.route('/posts//publish', methods=['POST']) - @require_auth - @require_permission('publish', 'post') - def publish_post(post_id): - """Publish a post (requires publish permission).""" - post = storage.get_post(str(post_id)) - - if not post: - return jsonify({ - 'error': 'Not found', - 'message': 'Post not found' - }), 404 - - # Update to published - updated_post = storage.update_post(str(post_id), status=PostStatus.PUBLISHED) - - return jsonify({ - 'message': 'Post published successfully', - 'post': updated_post.to_dict() - }) - - # ==================== Comment Routes ==================== - - @app.route('/posts//comments', methods=['GET']) - def list_comments(post_id): - """List comments for a post.""" - # Check if post exists - post = storage.get_post(str(post_id)) - if not post: - return jsonify({ - 'error': 'Not found', - 'message': 'Post not found' - }), 404 - - comments = storage.list_comments(str(post_id)) - - return jsonify({ - 'comments': [c.to_dict() for c in comments], - 'count': len(comments) - }) - - @app.route('/posts//comments', methods=['POST']) - @require_auth - @require_permission('create', 'comment') - def create_comment(post_id): - """Add a comment to a post.""" - data = request.get_json() - user = g.current_user - - content = data.get('content', '').strip() - - if not content: - return jsonify({ - 'error': 'Validation error', - 'message': 'Content is required' - }), 400 - - comment = storage.create_comment( - post_id=str(post_id), - content=content, - author_id=user['user_id'], - author_username=user['username'] - ) - - if not comment: - return jsonify({ - 'error': 'Not found', - 'message': 'Post not found' - }), 404 - - return jsonify({ - 'message': 'Comment created successfully', - 'comment': comment.to_dict() - }), 201 - - @app.route('/comments/', methods=['DELETE']) - @require_auth - @require_permission('delete', 'comment', check_ownership=True) - def delete_comment(comment_id): - """Delete a comment (must be owner or moderator).""" - success = storage.delete_comment(str(comment_id), soft=True) - - if success: - return jsonify({ - 'message': 'Comment deleted successfully' - }) - - return jsonify({ - 'error': 'Not found', - 'message': 'Comment not found' - }), 404 - - # ==================== Admin Routes ==================== - - @app.route('/admin/users', methods=['GET']) - @require_auth - @require_admin - def list_users(): - """List all users (admin only).""" - users = storage.list_users() - - return jsonify({ - 'users': [u.to_dict() for u in users], - 'count': len(users) - }) - - @app.route('/admin/users//role', methods=['PUT']) - @require_auth - @require_admin - def update_user_role(user_id): - """Update a user's role (admin only).""" - data = request.get_json() - new_role = data.get('role', '').lower() - - valid_roles = ['admin', 'editor', 'author', 'reader'] - if new_role not in valid_roles: - return jsonify({ - 'error': 'Validation error', - 'message': f'Role must be one of: {", ".join(valid_roles)}' - }), 400 - - user = storage.update_user_role(str(user_id), new_role) - - if not user: - return jsonify({ - 'error': 'Not found', - 'message': 'User not found' - }), 404 - - # Update RBAC role - remove old role and assign new one - rbac_user_id = f"user_{user_id}" - try: - # Get current roles - user_details = rbac.get_user(rbac_user_id) - if user_details: - # Note: The RBAC library should have revoke_role - # by reassigning - the library should handle this internally - pass - - # Assign new role - rbac.assign_role(rbac_user_id, f"role_{new_role}") - except Exception: - # If user doesn't exist in RBAC, create them - try: - rbac.create_user( - user_id=rbac_user_id, - email=user.email, - name=user.username - ) - rbac.assign_role(rbac_user_id, f"role_{new_role}") - except Exception: - pass # User might already exist - - return jsonify({ - 'message': 'User role updated successfully', - 'user': user.to_dict() - }) - - @app.route('/admin/stats', methods=['GET']) - @require_auth - @require_admin - def get_stats(): - """Get system statistics (admin only).""" - stats = storage.get_stats() - - return jsonify(stats.to_dict()) - + def _server_error_handler(error): # noqa: ARG001 + return jsonify({'error': 'Internal server error', 'message': 'An unexpected error occurred'}), 500 + + # Register blueprints + app.register_blueprint(main_bp) + app.register_blueprint(auth_bp) + app.register_blueprint(posts_bp) + app.register_blueprint(comments_bp) + app.register_blueprint(admin_bp) + return app +# --------------------------------------------------------------------------- +# RBAC setup +# --------------------------------------------------------------------------- + def setup_rbac(rbac: RBAC): """Set up RBAC roles and permissions.""" - - # Define permissions permissions_data = [ - # Post permissions ('perm_post_create', 'post', 'create', 'Create blog posts'), ('perm_post_read', 'post', 'read', 'Read blog posts'), ('perm_post_update', 'post', 'update', 'Update blog posts'), ('perm_post_delete', 'post', 'delete', 'Delete blog posts'), ('perm_post_publish', 'post', 'publish', 'Publish blog posts'), - - # Comment permissions ('perm_comment_create', 'comment', 'create', 'Create comments'), ('perm_comment_read', 'comment', 'read', 'Read comments'), ('perm_comment_delete', 'comment', 'delete', 'Delete comments'), - - # User permissions ('perm_user_manage', 'user', 'manage', 'Manage users'), ('perm_stats_view', 'stats', 'view', 'View statistics'), ] - for perm_id, resource_type, action, description in permissions_data: rbac.create_permission( permission_id=perm_id, resource_type=resource_type, action=action, - description=description + description=description, ) - - # Define roles and their permissions + roles_data = { 'admin': { 'name': 'Administrator', @@ -579,42 +451,39 @@ def setup_rbac(rbac: RBAC): 'perm_post_create', 'perm_post_read', 'perm_post_update', 'perm_post_delete', 'perm_post_publish', 'perm_comment_create', 'perm_comment_read', 'perm_comment_delete', - 'perm_user_manage', 'perm_stats_view' + 'perm_user_manage', 'perm_stats_view', ], - 'description': 'Full access to all resources' + 'description': 'Full access to all resources', }, 'editor': { 'name': 'Editor', 'permissions': [ 'perm_post_create', 'perm_post_read', 'perm_post_update', 'perm_post_delete', 'perm_post_publish', - 'perm_comment_create', 'perm_comment_read', 'perm_comment_delete' + 'perm_comment_create', 'perm_comment_read', 'perm_comment_delete', ], - 'description': 'Can manage all content' + 'description': 'Can manage all content', }, 'author': { 'name': 'Author', 'permissions': [ 'perm_post_create', 'perm_post_read', 'perm_post_update', 'perm_post_delete', - 'perm_comment_create', 'perm_comment_read' + 'perm_comment_create', 'perm_comment_read', ], - 'description': 'Can create and manage own posts' + 'description': 'Can create and manage own posts', }, 'reader': { 'name': 'Reader', - 'permissions': [ - 'perm_post_read', 'perm_comment_read', 'perm_comment_create' - ], - 'description': 'Can read content and add comments' - } + 'permissions': ['perm_post_read', 'perm_comment_read', 'perm_comment_create', 'perm_comment_delete'], + 'description': 'Can read content and add comments (including deleting own comments)', + }, } - for role_id, role_data in roles_data.items(): rbac.create_role( role_id=f'role_{role_id}', name=role_data['name'], permissions=role_data['permissions'], - description=role_data['description'] + description=role_data['description'], ) @@ -637,6 +506,4 @@ def setup_rbac(rbac: RBAC): ║ ║ ╚════════════════════════════════════════════════╝ """) - # Use debug flag from app config (controlled via FLASK_DEBUG env var) rather than - # hardcoding debug=True, which would enable the interactive debugger in production. app.run(debug=app.config.get('DEBUG', False), host='0.0.0.0', port=5000) diff --git a/test-apps/02-flask-blog-api/decorators.py b/test-apps/02-flask-blog-api/decorators.py index 21692d4..c9882de 100644 --- a/test-apps/02-flask-blog-api/decorators.py +++ b/test-apps/02-flask-blog-api/decorators.py @@ -9,117 +9,149 @@ logger = logging.getLogger(__name__) +# Error/message constants to avoid duplication +MSG_AUTH_REQUIRED = 'Authentication required' +MSG_LOGIN_REQUIRED = 'You must be logged in to perform this action' + + +# --------------------------------------------------------------------------- +# Private helpers (reduce cognitive complexity in the decorators below) +# --------------------------------------------------------------------------- + +# Roles that can perform ownership-protected actions on ANY resource +OWNERSHIP_OVERRIDE_ROLES = frozenset({'admin', 'editor'}) + + +def _fetch_owned_resource(storage, resource_type, kwargs): + """Return the owned resource for an ownership check, or (None, error) if not found.""" + resource_id = kwargs.get('post_id') or kwargs.get('comment_id') or kwargs.get('id') + if not resource_id: + return None, None + + resource_id = str(resource_id) # Flask URL converters may yield int + if 'post' in (resource_type or ''): + resource = storage.get_post(resource_id) + elif 'comment' in (resource_type or ''): + resource = storage.get_comment(resource_id) + else: + resource = None + + if not resource: + error = ( + jsonify({'error': 'Not found', 'message': f'{resource_type.capitalize()} not found'}), + 404, + ) + return None, error + return resource, None + + +def _build_permission_context(user, resource): + """Build the ABAC context dict used during the RBAC check.""" + context = { + 'user_id': user['user_id'], + 'username': user['username'], + 'role': user['role'], + } + if resource: + owner_id = getattr(resource, 'author_id', None) or getattr(resource, 'user_id', None) + context['resource_owner'] = owner_id + context['is_owner'] = owner_id == user['user_id'] + return context + + +def _forbidden_response(check_ownership, resource, context, action, resource_type): + """Return a 403 response tuple for a failed permission check.""" + if check_ownership and resource and not context.get('is_owner'): + return jsonify({ + 'error': 'Forbidden', + 'message': 'You can only modify your own content', + 'reason': 'ownership_required', + }), 403 + return jsonify({ + 'error': 'Forbidden', + 'message': f'You do not have permission to {action} {resource_type}', + 'reason': 'permission_denied', + }), 403 + def require_permission(action: str, resource_type: str = None, check_ownership: bool = False): """ Decorator to require specific permission for a route. - + Args: action: The action to check (e.g., 'create', 'read', 'update', 'delete') resource_type: The resource type (e.g., 'post', 'comment'). If None, uses action only check_ownership: If True, check if user owns the resource (for update/delete operations) - + Usage: @app.route('/posts', methods=['POST']) @require_auth @require_permission('create', 'post') def create_post(): - # User has 'create:post' permission return jsonify({'message': 'Post created'}) - + @app.route('/posts/', methods=['PUT']) @require_auth @require_permission('update', 'post', check_ownership=True) def update_post(post_id): - # User has 'update:post' permission AND owns the post return jsonify({'message': 'Post updated'}) """ def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): user = get_current_user() - + if not user: return jsonify({ - 'error': 'Authentication required', - 'message': 'You must be logged in to perform this action' + 'error': MSG_AUTH_REQUIRED, + 'message': MSG_LOGIN_REQUIRED, }), 401 - - # Get RBAC engine from Flask's g object + rbac = g.rbac storage = g.storage - - # Get resource for ownership check + + # Resolve resource for ownership checks resource = None if check_ownership: - # Extract resource ID from URL parameters - resource_id = kwargs.get('post_id') or kwargs.get('comment_id') or kwargs.get('id') - - if resource_id: - # Get resource from storage to check ownership - if 'post' in (resource_type or ''): - resource = storage.get_post(resource_id) - elif 'comment' in (resource_type or ''): - resource = storage.get_comment(resource_id) - - if not resource: - return jsonify({ - 'error': 'Not found', - 'message': f'{resource_type.capitalize()} not found' - }), 404 - - # Check permission with RBAC engine + resource, err = _fetch_owned_resource(storage, resource_type, kwargs) + if err is not None: + return err + try: - # Build context for ABAC (attribute-based) checks - context = { - 'user_id': user['user_id'], - 'username': user['username'], - 'role': user['role'] - } - - # Add resource info to context for ownership checks - if resource: - context['resource_owner'] = getattr(resource, 'author_id', None) or getattr(resource, 'user_id', None) - context['is_owner'] = context['resource_owner'] == user['user_id'] - - # Perform authorization check using user_id with 'user_' prefix + context = _build_permission_context(user, resource) rbac_user_id = f"user_{user['user_id']}" can_access = rbac.can( user_id=rbac_user_id, action=action, resource=resource_type, - context=context + context=context, ) - + if not can_access: - # If ownership check required but user doesn't own the resource - if check_ownership and resource and not context.get('is_owner'): - return jsonify({ - 'error': 'Forbidden', - 'message': 'You can only modify your own content', - 'reason': 'ownership_required' - }), 403 - - return jsonify({ - 'error': 'Forbidden', - 'message': f'You do not have permission to {action} {resource_type}', - 'reason': 'permission_denied' - }), 403 - - # Store resource in g for handler use + return _forbidden_response(check_ownership, resource, context, action, resource_type) + + # Ownership enforcement: if the resource is found and the user does not + # own it, only override roles (admin/editor) may proceed. + if ( + check_ownership + and resource + and not context.get('is_owner') + and user.get('role', '') not in OWNERSHIP_OVERRIDE_ROLES + ): + return _forbidden_response(True, resource, context, action, resource_type) + if resource: g.resource = resource - + return f(*args, **kwargs) - + except Exception as e: - # Log the full error internally; do not expose exception details to the client logger.error('Authorization check failed: %s', str(e), exc_info=True) return jsonify({ 'error': 'Authorization error', - 'message': 'Failed to check permissions' + 'message': 'Failed to check permissions', }), 500 - + return decorated_function return decorator @@ -154,8 +186,8 @@ def decorated_function(*args, **kwargs): if not user: return jsonify({ - 'error': 'Authentication required', - 'message': 'You must be logged in to perform this action' + 'error': MSG_AUTH_REQUIRED, + 'message': MSG_LOGIN_REQUIRED }), 401 user_role = user.get('role', '') @@ -191,8 +223,8 @@ def decorated_function(*args, **kwargs): if not user: return jsonify({ - 'error': 'Authentication required', - 'message': 'You must be logged in to perform this action' + 'error': MSG_AUTH_REQUIRED, + 'message': MSG_LOGIN_REQUIRED }), 401 if user.get('role') != 'admin': diff --git a/test-apps/02-flask-blog-api/models.py b/test-apps/02-flask-blog-api/models.py index 7660b03..fa22103 100644 --- a/test-apps/02-flask-blog-api/models.py +++ b/test-apps/02-flask-blog-api/models.py @@ -8,6 +8,10 @@ from enum import Enum +def _utcnow() -> datetime: + return datetime.now(timezone.utc) + + class PostStatus(str, Enum): """Post status enumeration.""" DRAFT = 'draft' @@ -23,7 +27,7 @@ class User: email: str password_hash: str role: str # admin, editor, author, reader - created_at: datetime = field(default_factory=datetime.utcnow) + created_at: datetime = field(default_factory=_utcnow) def to_dict(self, include_password: bool = False) -> dict: """Convert user to dictionary (excluding password by default).""" diff --git a/test-apps/02-flask-blog-api/storage.py b/test-apps/02-flask-blog-api/storage.py index c1a3f6d..2dc15e5 100644 --- a/test-apps/02-flask-blog-api/storage.py +++ b/test-apps/02-flask-blog-api/storage.py @@ -93,7 +93,7 @@ def create_post(self, title: str, content: str, author_id: str, ) if status == PostStatus.PUBLISHED: - post.published_at = datetime.utcnow() + post.published_at = datetime.now(timezone.utc) self.posts[post_id] = post return post @@ -141,12 +141,12 @@ def update_post(self, post_id: str, title: Optional[str] = None, post.status = status # Set published_at when transitioning to published if status == PostStatus.PUBLISHED and old_status != PostStatus.PUBLISHED: - post.published_at = datetime.utcnow() + post.published_at = datetime.now(timezone.utc) if tags is not None: post.tags = tags - post.updated_at = datetime.utcnow() + post.updated_at = datetime.now(timezone.utc) return post @@ -213,7 +213,7 @@ def update_comment(self, comment_id: str, content: str) -> Optional[Comment]: return None comment.content = content - comment.updated_at = datetime.utcnow() + comment.updated_at = datetime.now(timezone.utc) return comment @@ -232,7 +232,7 @@ def delete_comment(self, comment_id: str, soft: bool = True) -> bool: if soft: comment.is_deleted = True comment.content = '[deleted]' - comment.updated_at = datetime.utcnow() + comment.updated_at = datetime.now(timezone.utc) else: del self.comments[comment_id] diff --git a/test-apps/02-flask-blog-api/test_api.py b/test-apps/02-flask-blog-api/test_api.py index 288b63d..49225a2 100644 --- a/test-apps/02-flask-blog-api/test_api.py +++ b/test-apps/02-flask-blog-api/test_api.py @@ -233,9 +233,10 @@ def test_update_own_post(client, tokens): def test_update_others_post_as_author_fails(client, tokens): - """Test that author cannot update other author's posts.""" - # Try to update post 1 (created by seed data) - response = client.put('/posts/1', + """Test that author cannot update another author's posts.""" + # Post 4 is created by jane_author in seed data; john_author (tokens['author']) + # does not own it, so this should be rejected. + response = client.put('/posts/4', headers=get_auth_header(tokens['author']), json={ 'title': 'Trying to hack' diff --git a/test-apps/03-fastapi-blog-api/dependencies.py b/test-apps/03-fastapi-blog-api/dependencies.py index 4d44b82..094e9cb 100644 --- a/test-apps/03-fastapi-blog-api/dependencies.py +++ b/test-apps/03-fastapi-blog-api/dependencies.py @@ -25,6 +25,9 @@ def create_post( logger = logging.getLogger(__name__) +# Roles that can perform ownership-protected actions on ANY resource +OWNERSHIP_OVERRIDE_ROLES: frozenset = frozenset({"admin", "editor"}) + # --------------------------------------------------------------------------- # App-state accessors (thin DI wrappers over request.app.state) @@ -72,6 +75,70 @@ def __init__( self.resource_type = resource_type self.check_ownership = check_ownership + def _resolve_resource( + self, + request: Request, + storage: InMemoryStorage, + ): + """Fetch the resource from storage based on path params, or return None.""" + resource_id = ( + request.path_params.get("post_id") + or request.path_params.get("comment_id") + or request.path_params.get("id") + ) + if not resource_id: + return None + + resource_id = str(resource_id) + if self.resource_type and "post" in self.resource_type: + resource = storage.get_post(resource_id) + elif self.resource_type and "comment" in self.resource_type: + resource = storage.get_comment(resource_id) + else: + resource = None + + if resource is None: + label = (self.resource_type or "resource").capitalize() + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail={"error": "Not found", "message": f"{label} not found"}, + ) + return resource + + @staticmethod + def _build_context(current_user: dict, resource) -> dict: + """Build the ABAC context dict for the RBAC engine.""" + context: dict = { + "user_id": current_user["user_id"], + "username": current_user["username"], + "role": current_user["role"], + } + if resource is not None: + owner_id = getattr(resource, "author_id", None) or getattr(resource, "user_id", None) + context["resource_owner"] = owner_id + context["is_owner"] = owner_id == current_user["user_id"] + return context + + def _raise_forbidden(self, check_ownership: bool, resource, context: dict) -> None: + """Raise the appropriate 403 HTTPException.""" + if check_ownership and resource and not context.get("is_owner"): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={ + "error": "Forbidden", + "message": "You can only modify your own content", + "reason": "ownership_required", + }, + ) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail={ + "error": "Forbidden", + "message": f"You do not have permission to {self.action} {self.resource_type}", + "reason": "permission_denied", + }, + ) + def __call__( self, request: Request, @@ -80,40 +147,10 @@ def __call__( rbac=Depends(get_rbac), ) -> None: """Perform the RBAC check; raises HTTPException on failure.""" - resource = None - - if self.check_ownership: - resource_id = ( - request.path_params.get("post_id") - or request.path_params.get("comment_id") - or request.path_params.get("id") - ) - if resource_id: - resource_id = str(resource_id) - if self.resource_type and "post" in self.resource_type: - resource = storage.get_post(resource_id) - elif self.resource_type and "comment" in self.resource_type: - resource = storage.get_comment(resource_id) - - if resource is None: - label = (self.resource_type or "resource").capitalize() - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail={"error": "Not found", "message": f"{label} not found"}, - ) + resource = self._resolve_resource(request, storage) if self.check_ownership else None + context = self._build_context(current_user, resource) try: - context: dict = { - "user_id": current_user["user_id"], - "username": current_user["username"], - "role": current_user["role"], - } - - if resource is not None: - owner_id = getattr(resource, "author_id", None) or getattr(resource, "user_id", None) - context["resource_owner"] = owner_id - context["is_owner"] = owner_id == current_user["user_id"] - rbac_user_id = f"user_{current_user['user_id']}" can_access = rbac.can( user_id=rbac_user_id, @@ -121,25 +158,19 @@ def __call__( resource=self.resource_type, context=context, ) - if not can_access: - if self.check_ownership and resource and not context.get("is_owner"): - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail={ - "error": "Forbidden", - "message": "You can only modify your own content", - "reason": "ownership_required", - }, - ) - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail={ - "error": "Forbidden", - "message": f"You do not have permission to {self.action} {self.resource_type}", - "reason": "permission_denied", - }, - ) + self._raise_forbidden(self.check_ownership, resource, context) + + # Ownership enforcement: RBAC may allow the action by role, but if the + # caller doesn't own the resource and their role cannot override ownership, + # block the request explicitly. + if ( + self.check_ownership + and resource + and not context.get("is_owner") + and current_user.get("role", "") not in OWNERSHIP_OVERRIDE_ROLES + ): + self._raise_forbidden(True, resource, context) except HTTPException: raise # Re-raise HTTP exceptions untouched diff --git a/test-apps/03-fastapi-blog-api/seed_data.py b/test-apps/03-fastapi-blog-api/seed_data.py index e155338..3e331f0 100644 --- a/test-apps/03-fastapi-blog-api/seed_data.py +++ b/test-apps/03-fastapi-blog-api/seed_data.py @@ -6,6 +6,13 @@ from datetime import datetime, timezone from models import PostStatus +# --------------------------------------------------------------------------- +# Post-title constants (reused across seeding and comment linking) +# --------------------------------------------------------------------------- +TITLE_RBAC_INTRO = "Getting Started with RBAC" +TITLE_FASTAPI_REST = "Building REST APIs with FastAPI" +TITLE_SECURITY = "Security Best Practices for Web APIs" + def load_seed_data(storage, rbac, auth_manager) -> None: """Populate storage and RBAC with demo data.""" @@ -13,14 +20,14 @@ def load_seed_data(storage, rbac, auth_manager) -> None: print("Loading seed data...") # ------------------------------------------------------------------ - # Users + # Users (passwords are intentional demo-only values) # ------------------------------------------------------------------ users_data = [ - {"username": "admin", "email": "admin@blogapi.com", "password": "admin123", "role": "admin"}, - {"username": "editor", "email": "editor@blogapi.com", "password": "editor123", "role": "editor"}, - {"username": "john_author", "email": "john@blogapi.com", "password": "author123", "role": "author"}, - {"username": "jane_author", "email": "jane@blogapi.com", "password": "author123", "role": "author"}, - {"username": "bob_reader", "email": "bob@blogapi.com", "password": "reader123", "role": "reader"}, + {"username": "admin", "email": "admin@blogapi.com", "password": "admin123", "role": "admin"}, # NOSONAR + {"username": "editor", "email": "editor@blogapi.com", "password": "editor123", "role": "editor"}, # NOSONAR + {"username": "john_author", "email": "john@blogapi.com", "password": "author123", "role": "author"}, # NOSONAR + {"username": "jane_author", "email": "jane@blogapi.com", "password": "author123", "role": "author"}, # NOSONAR + {"username": "bob_reader", "email": "bob@blogapi.com", "password": "reader123", "role": "reader"}, # NOSONAR ] created_users: dict = {} @@ -43,7 +50,7 @@ def load_seed_data(storage, rbac, auth_manager) -> None: # ------------------------------------------------------------------ posts_data = [ { - "title": "Getting Started with RBAC", + "title": TITLE_RBAC_INTRO, "content": "Role-Based Access Control (RBAC) is a powerful authorization model. " "In this post we explore the fundamentals and key concepts including " "Users, Roles, Permissions, and Resources.", @@ -52,7 +59,7 @@ def load_seed_data(storage, rbac, auth_manager) -> None: "tags": ["rbac", "security", "tutorial"], }, { - "title": "Building REST APIs with FastAPI", + "title": TITLE_FASTAPI_REST, "content": "FastAPI is a modern, high-performance Python framework for building APIs. " "It features automatic OpenAPI docs, Pydantic models, and native async support.", "author": "john_author", @@ -76,7 +83,7 @@ def load_seed_data(storage, rbac, auth_manager) -> None: "tags": ["content", "management"], }, { - "title": "Security Best Practices for Web APIs", + "title": TITLE_SECURITY, "content": "Securing your API requires more than just authentication. " "We cover authorization, rate limiting, input validation, and audit logging.", "author": "editor", @@ -103,13 +110,13 @@ def load_seed_data(storage, rbac, auth_manager) -> None: # Comments # ------------------------------------------------------------------ comments_data = [ - {"post": "Getting Started with RBAC", "author": "bob_reader", "content": "Great intro to RBAC!"}, - {"post": "Getting Started with RBAC", "author": "jane_author", "content": "Really well explained."}, - {"post": "Building REST APIs with FastAPI","author": "bob_reader", "content": "FastAPI is amazing, thanks!"}, - {"post": "Building REST APIs with FastAPI","author": "editor", "content": "Good overview of FastAPI."}, + {"post": TITLE_RBAC_INTRO, "author": "bob_reader", "content": "Great intro to RBAC!"}, + {"post": TITLE_RBAC_INTRO, "author": "jane_author", "content": "Really well explained."}, + {"post": TITLE_FASTAPI_REST, "author": "bob_reader", "content": "FastAPI is amazing, thanks!"}, + {"post": TITLE_FASTAPI_REST, "author": "editor", "content": "Good overview of FastAPI."}, {"post": "Managing Blog Content at Scale","author": "john_author","content": "Very useful editorial tips."}, - {"post": "Security Best Practices for Web APIs","author": "john_author","content": "Great coverage of auth patterns."}, - {"post": "Security Best Practices for Web APIs","author": "bob_reader", "content": "Can you cover OAuth next?"}, + {"post": TITLE_SECURITY, "author": "john_author", "content": "Great coverage of auth patterns."}, + {"post": TITLE_SECURITY, "author": "bob_reader", "content": "Can you cover OAuth next?"}, ] for cd in comments_data: @@ -124,7 +131,7 @@ def load_seed_data(storage, rbac, auth_manager) -> None: ) stats = storage.get_stats() - print(f"\nSeed data loaded successfully!") + print("\nSeed data loaded successfully!") print(f" Users: {stats.total_users}") print(f" Posts: {stats.total_posts} ({stats.published_posts} published)") print(f" Comments: {stats.total_comments}") diff --git a/test-apps/03-fastapi-blog-api/test_api.py b/test-apps/03-fastapi-blog-api/test_api.py index 271ab6d..79a5ffa 100644 --- a/test-apps/03-fastapi-blog-api/test_api.py +++ b/test-apps/03-fastapi-blog-api/test_api.py @@ -41,11 +41,11 @@ def client(app): def registered_users(client): """Register all personas and return {username: token, ...}.""" credentials = { - "admin": {"username": "test_admin", "email": "tadmin@test.com", "password": "Admin1234!"}, - "editor": {"username": "test_editor", "email": "teditor@test.com", "password": "Editor123!"}, - "author1": {"username": "test_author1", "email": "tauth1@test.com", "password": "Author123!"}, - "author2": {"username": "test_author2", "email": "tauth2@test.com", "password": "Author456!"}, - "reader": {"username": "test_reader", "email": "treader@test.com", "password": "Reader123!"}, + "admin": {"username": "test_admin", "email": "tadmin@test.com", "password": "Admin1234!"}, # NOSONAR + "editor": {"username": "test_editor", "email": "teditor@test.com", "password": "Editor123!"}, # NOSONAR + "author1": {"username": "test_author1", "email": "tauth1@test.com", "password": "Author123!"}, # NOSONAR + "author2": {"username": "test_author2", "email": "tauth2@test.com", "password": "Author456!"}, # NOSONAR + "reader": {"username": "test_reader", "email": "treader@test.com", "password": "Reader123!"}, # NOSONAR } tokens = {} user_ids = {} @@ -117,7 +117,7 @@ def test_register_success(self, client): r = client.post("/auth/register", json={ "username": "newuser_auth", "email": "newuser_auth@test.com", - "password": "NewPass123!", + "password": "NewPass123!", # NOSONAR }) assert r.status_code == 201 body = r.json() @@ -126,12 +126,11 @@ def test_register_success(self, client): assert body["user"]["role"] == "reader" def test_register_duplicate_username(self, client, registered_users): - token = registered_users["tokens"]["reader"] # reader already registered; try same username r = client.post("/auth/register", json={ "username": "test_reader", "email": "other@test.com", - "password": "Pass1234!", + "password": "Pass1234!", # NOSONAR }) assert r.status_code == 409 @@ -139,7 +138,7 @@ def test_register_duplicate_email(self, client, registered_users): r = client.post("/auth/register", json={ "username": "different_name", "email": "treader@test.com", - "password": "Pass1234!", + "password": "Pass1234!", # NOSONAR }) assert r.status_code == 409 @@ -147,19 +146,19 @@ def test_login_success(self, client): client.post("/auth/register", json={ "username": "login_test_user", "email": "logintest@test.com", - "password": "LoginPass1!", + "password": "LoginPass1!", # NOSONAR }) - r = client.post("/auth/login", json={"username": "login_test_user", "password": "LoginPass1!"}) + r = client.post("/auth/login", json={"username": "login_test_user", "password": "LoginPass1!"}) # NOSONAR assert r.status_code == 200 body = r.json() assert "access_token" in body def test_login_wrong_password(self, client, registered_users): - r = client.post("/auth/login", json={"username": "test_reader", "password": "wrongpassword"}) + r = client.post("/auth/login", json={"username": "test_reader", "password": "wrongpassword"}) # NOSONAR assert r.status_code == 401 def test_login_nonexistent_user(self, client): - r = client.post("/auth/login", json={"username": "ghost_user", "password": "whatever"}) + r = client.post("/auth/login", json={"username": "ghost_user", "password": "whatever"}) # NOSONAR assert r.status_code == 401 def test_get_me(self, client, registered_users): diff --git a/tests/conftest.py b/tests/conftest.py index b3df424..3593883 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -29,10 +29,10 @@ def sample_user(domain): """Sample user fixture.""" return User( id="user1", - username="testuser", + name="testuser", email="test@example.com", domain=domain, - metadata={"department": "engineering"} + attributes={"department": "engineering"} ) @@ -52,9 +52,8 @@ def sample_permission(domain): """Sample permission fixture.""" return Permission( id="perm1", - action="read", - resource="document", - domain=domain + resource_type="document", + action="read" ) diff --git a/tests/integration/test_complete_workflows.py b/tests/integration/test_complete_workflows.py index 4ecfa47..ca99724 100644 --- a/tests/integration/test_complete_workflows.py +++ b/tests/integration/test_complete_workflows.py @@ -22,9 +22,9 @@ def test_basic_user_role_permission_flow(self): # Step 1: Create a user user = User( id="alice", - username="alice_admin", + name="alice_admin", email="alice@example.com", - metadata={"department": "engineering"} + attributes={"department": "engineering"} ) rbac._storage.store_user(user) @@ -32,7 +32,7 @@ def test_basic_user_role_permission_flow(self): resource = Resource( id="doc_1", type="document", - metadata={"classification": "public"} + attributes={"classification": "public"} ) rbac._storage.store_resource(resource) @@ -40,12 +40,12 @@ def test_basic_user_role_permission_flow(self): read_perm = Permission( id="perm_read", action="read", - resource=resource + resource_type=resource.type ) write_perm = Permission( id="perm_write", action="write", - resource=resource + resource_type=resource.type ) rbac._storage.store_permission(read_perm) rbac._storage.store_permission(write_perm) @@ -78,7 +78,7 @@ def test_role_hierarchy_permission_inheritance(self): # Create user user = User( id="bob", - username="bob_user", + name="bob_user", email="bob@example.com" ) rbac._storage.store_user(user) @@ -88,9 +88,9 @@ def test_role_hierarchy_permission_inheritance(self): rbac._storage.store_resource(resource) # Create permissions - read_perm = Permission(id="perm_read", action="read", resource=resource) - write_perm = Permission(id="perm_write", action="write", resource=resource) - admin_perm = Permission(id="perm_admin", action="admin", resource=resource) + read_perm = Permission(id="perm_read", action="read", resource_type=resource.type) + write_perm = Permission(id="perm_write", action="write", resource_type=resource.type) + admin_perm = Permission(id="perm_admin", action="admin", resource_type=resource.type) rbac._storage.store_permission(read_perm) rbac._storage.store_permission(write_perm) @@ -133,7 +133,7 @@ def test_multi_role_assignment_accumulation(self): rbac = RBAC(storage='memory') # Create user - user = User(id="charlie", username="charlie", email="charlie@example.com") + user = User(id="charlie", name="charlie", email="charlie@example.com") rbac._storage.store_user(user) # Create resources @@ -143,9 +143,9 @@ def test_multi_role_assignment_accumulation(self): rbac._storage.store_resource(db_resource) # Create permissions for different resources - doc_read = Permission(id="perm_doc_read", action="read", resource=doc_resource) - db_read = Permission(id="perm_db_read", action="read", resource=db_resource) - db_write = Permission(id="perm_db_write", action="write", resource=db_resource) + doc_read = Permission(id="perm_doc_read", action="read", resource_type=doc_resource.type) + db_read = Permission(id="perm_db_read", action="read", resource_type=db_resource.type) + db_write = Permission(id="perm_db_write", action="write", resource_type=db_resource.type) rbac._storage.store_permission(doc_read) rbac._storage.store_permission(db_read) @@ -187,7 +187,7 @@ def test_user_lifecycle_and_status_changes(self): # Create active user with permissions user = User( id="dave", - username="dave", + name="dave", email="dave@example.com", status=EntityStatus.ACTIVE ) @@ -196,7 +196,7 @@ def test_user_lifecycle_and_status_changes(self): resource = Resource(id="file_1", type="file") rbac._storage.store_resource(resource) - permission = Permission(id="perm_read", action="read", resource=resource) + permission = Permission(id="perm_read", action="read", resource_type=resource.type) rbac._storage.store_permission(permission) role = Role(id="role_reader", name="Reader", permissions={permission}) @@ -212,7 +212,7 @@ def test_user_lifecycle_and_status_changes(self): # Suspend user suspended_user = User( id="dave", - username="dave", + name="dave", email="dave@example.com", status=EntityStatus.SUSPENDED ) @@ -224,7 +224,7 @@ def test_user_lifecycle_and_status_changes(self): # Reactivate user active_user = User( id="dave", - username="dave", + name="dave", email="dave@example.com", status=EntityStatus.ACTIVE ) @@ -238,8 +238,8 @@ def test_role_modification_affects_users(self): rbac = RBAC(storage='memory') # Create two users - user1 = User(id="user1", username="user1", email="user1@example.com") - user2 = User(id="user2", username="user2", email="user2@example.com") + user1 = User(id="user1", name="user1", email="user1@example.com") + user2 = User(id="user2", name="user2", email="user2@example.com") rbac._storage.store_user(user1) rbac._storage.store_user(user2) @@ -248,7 +248,7 @@ def test_role_modification_affects_users(self): rbac._storage.store_resource(resource) # Create initial permission - read_perm = Permission(id="perm_read", action="read", resource=resource) + read_perm = Permission(id="perm_read", action="read", resource_type=resource.type) rbac._storage.store_permission(read_perm) # Create role with limited permission @@ -274,7 +274,7 @@ def test_role_modification_affects_users(self): assert rbac.can("user2", "write", "service") is False # Add write permission to role - write_perm = Permission(id="perm_write", action="write", resource=resource) + write_perm = Permission(id="perm_write", action="write", resource_type=resource.type) rbac._storage.store_permission(write_perm) updated_role = Role( @@ -295,13 +295,13 @@ def test_domain_isolation(self): # Create users in different domains user_domain_a = User( id="user_a", - username="user_a", + name="user_a", email="user@domain-a.com", domain="domain-a" ) user_domain_b = User( id="user_b", - username="user_b", + name="user_b", email="user@domain-b.com", domain="domain-b" ) @@ -315,8 +315,8 @@ def test_domain_isolation(self): rbac._storage.store_resource(resource_b) # Create permissions - perm_a = Permission(id="perm_a", action="read", resource=resource_a) - perm_b = Permission(id="perm_b", action="read", resource=resource_b) + perm_a = Permission(id="perm_a", action="read", resource_type=resource_a.type) + perm_b = Permission(id="perm_b", action="read", resource_type=resource_b.type) rbac._storage.store_permission(perm_a) rbac._storage.store_permission(perm_b) @@ -359,7 +359,7 @@ def test_authorization_with_many_roles(self): rbac = RBAC(storage='memory') # Create user - user = User(id="power_user", username="power_user", email="power@example.com") + user = User(id="power_user", name="power_user", email="power@example.com") rbac._storage.store_user(user) # Create resource @@ -371,7 +371,7 @@ def test_authorization_with_many_roles(self): permission = Permission( id=f"perm_{i}", action=f"action_{i}", - resource=resource + resource_type=resource.type ) rbac._storage.store_permission(permission) @@ -397,7 +397,7 @@ def test_authorization_with_deep_hierarchy(self): rbac = RBAC(storage='memory', enable_hierarchy=True) # Create user - user = User(id="hierarchical_user", username="h_user", email="h@example.com") + user = User(id="hierarchical_user", name="h_user", email="h@example.com") rbac._storage.store_user(user) # Create resource @@ -409,7 +409,7 @@ def test_authorization_with_deep_hierarchy(self): permission = Permission( id=f"perm_level_{i}", action=f"level_{i}_action", - resource=resource + resource_type=resource.type ) rbac._storage.store_permission(permission) diff --git a/tests/property/test_authorization_invariants.py b/tests/property/test_authorization_invariants.py index 60595c7..71d17e1 100644 --- a/tests/property/test_authorization_invariants.py +++ b/tests/property/test_authorization_invariants.py @@ -60,7 +60,7 @@ def test_no_permissions_means_no_access(self, user_id, action, resource_type): # Create user with no roles user = User( id=user_id, - username=user_id, + name=user_id, email=f"{user_id}@example.com" ) rbac._storage.store_user(user) @@ -84,7 +84,7 @@ def test_explicit_permission_grants_access(self, user_id, action, resource_type) # Create user user = User( id=user_id, - username=user_id, + name=user_id, email=f"{user_id}@example.com" ) rbac._storage.store_user(user) @@ -100,7 +100,7 @@ def test_explicit_permission_grants_access(self, user_id, action, resource_type) permission = Permission( id=f"perm_{user_id}_{action}", action=action, - resource=resource + resource_type=resource.type ) rbac._storage.store_permission(permission) @@ -140,7 +140,7 @@ def test_permission_is_action_specific(self, user_id, action1, action2): # Create user user = User( id=user_id, - username=user_id, + name=user_id, email=f"{user_id}@example.com" ) rbac._storage.store_user(user) @@ -156,7 +156,7 @@ def test_permission_is_action_specific(self, user_id, action1, action2): permission = Permission( id=f"perm_{action1}", action=action1, - resource=resource + resource_type=resource.type ) rbac._storage.store_permission(permission) @@ -194,7 +194,7 @@ def test_suspended_user_has_no_access(self, user_id, action, resource_type): # Create suspended user user = User( id=user_id, - username=user_id, + name=user_id, email=f"{user_id}@example.com", status=EntityStatus.SUSPENDED ) @@ -211,7 +211,7 @@ def test_suspended_user_has_no_access(self, user_id, action, resource_type): permission = Permission( id=f"perm_{action}", action=action, - resource=resource + resource_type=resource.type ) rbac._storage.store_permission(permission) @@ -247,7 +247,7 @@ def test_authorization_is_deterministic(self, user_id, action, resource_type): # Create user user = User( id=user_id, - username=user_id, + name=user_id, email=f"{user_id}@example.com" ) rbac._storage.store_user(user) @@ -272,7 +272,7 @@ def test_multiple_roles_accumulate_permissions(self, user_id, num_roles): # Create user user = User( id=user_id, - username=user_id, + name=user_id, email=f"{user_id}@example.com" ) rbac._storage.store_user(user) @@ -294,7 +294,7 @@ def test_multiple_roles_accumulate_permissions(self, user_id, num_roles): permission = Permission( id=f"perm_{i}", action=action, - resource=resource + resource_type=resource.type ) rbac._storage.store_permission(permission) diff --git a/tests/test_models.py b/tests/test_models.py index 11fa018..a381cb7 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -12,12 +12,12 @@ def test_create_user(self, domain): """Test user creation.""" user = User( id="user1", - username="testuser", + name="testuser", email="test@example.com", domain=domain ) assert user.id == "user1" - assert user.username == "testuser" + assert user.name == "testuser" assert user.email == "test@example.com" assert user.domain == domain @@ -25,18 +25,18 @@ def test_user_with_metadata(self, domain): """Test user with metadata.""" user = User( id="user1", - username="testuser", + name="testuser", email="test@example.com", domain=domain, - metadata={"department": "engineering", "level": 3} + attributes={"department": "engineering", "level": 3} ) - assert user.metadata["department"] == "engineering" - assert user.metadata["level"] == 3 + assert user.attributes["department"] == "engineering" + assert user.attributes["level"] == 3 def test_user_equality(self, domain): """Test user equality.""" - user1 = User(id="user1", username="test", email="test@example.com", domain=domain) - user2 = User(id="user1", username="test", email="test@example.com", domain=domain) + user1 = User(id="user1", name="test", email="test@example.com", domain=domain) + user2 = User(id="user1", name="test", email="test@example.com", domain=domain) assert user1.id == user2.id assert user1.domain == user2.domain @@ -78,22 +78,19 @@ def test_create_permission(self, domain): """Test permission creation.""" perm = Permission( id="perm1", - action="read", - resource="document", - domain=domain + resource_type="document", + action="read" ) assert perm.id == "perm1" assert perm.action == "read" - assert perm.resource == "document" - assert perm.domain == domain + assert perm.resource_type == "document" def test_permission_with_conditions(self, domain): """Test permission with ABAC conditions.""" perm = Permission( id="perm1", + resource_type="document", action="read", - resource="document", - domain=domain, conditions={"department": {"==": "engineering"}} ) assert perm.conditions is not None diff --git a/tests/test_storage.py b/tests/test_storage.py index 3a24317..0463bb6 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -24,8 +24,8 @@ def test_get_user(self, storage, sample_user): def test_list_users(self, storage, domain): """Test listing users.""" - user1 = User(id="user1", username="user1", email="user1@example.com", domain=domain) - user2 = User(id="user2", username="user2", email="user2@example.com", domain=domain) + user1 = User(id="user1", name="user1", email="user1@example.com", domain=domain) + user2 = User(id="user2", name="user2", email="user2@example.com", domain=domain) storage.create_user(user1) storage.create_user(user2) @@ -55,8 +55,8 @@ def test_create_role(self, storage, sample_role): def test_domain_isolation(self, storage): """Test that domains are isolated.""" - user1 = User(id="user1", username="user1", email="user1@example.com", domain="domain1") - user2 = User(id="user1", username="user1", email="user1@example.com", domain="domain2") + user1 = User(id="user1", name="user1", email="user1@example.com", domain="domain1") + user2 = User(id="user1", name="user1", email="user1@example.com", domain="domain2") storage.create_user(user1) storage.create_user(user2) @@ -73,7 +73,7 @@ def test_domain_isolation(self, storage): def test_batch_create_users(self, storage, domain): """Test batch user creation.""" users = [ - User(id=f"user{i}", username=f"user{i}", email=f"user{i}@example.com", domain=domain) + User(id=f"user{i}", name=f"user{i}", email=f"user{i}@example.com", domain=domain) for i in range(5) ] created = storage.batch_create_users(users) @@ -81,7 +81,7 @@ def test_batch_create_users(self, storage, domain): def test_get_users_by_role(self, storage, domain): """Test getting users by role.""" - user = User(id="user1", username="user1", email="user1@example.com", domain=domain) + user = User(id="user1", name="user1", email="user1@example.com", domain=domain) role = Role(id="role1", name="admin", description="Admin", domain=domain) storage.create_user(user)