Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions backend/api/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from fastapi import APIRouter

from middleware.monitoring import query_performance_tracker
from services.database_service import get_db_service
from services.redis_service import redis_service
from services.storage_service import storage_service
Expand Down Expand Up @@ -77,3 +78,73 @@ async def health_check() -> Dict[str, Any]:
},
},
}


@router.get("/metrics")
async def get_performance_metrics() -> Dict[str, Any]:
"""Get performance metrics for monitoring and bottleneck identification"""

try:
# Get operation performance statistics
operations_summary = query_performance_tracker.get_all_operations_summary()

# Calculate overall statistics
total_operations = sum(
stats["call_count"] for stats in operations_summary.values()
)
total_time = sum(stats["total_time"] for stats in operations_summary.values())
avg_time_overall = total_time / total_operations if total_operations > 0 else 0

# Identify slowest operations
slowest_operations = sorted(
[
{
"operation": operation,
"avg_time": stats["avg_time"],
"call_count": stats["call_count"],
"total_time": stats["total_time"],
}
for operation, stats in operations_summary.items()
],
key=lambda x: x["avg_time"],
reverse=True,
)[
:5
] # Top 5 slowest

# Identify bottlenecks (operations taking > 2 seconds on average)
bottlenecks = [op for op in slowest_operations if op["avg_time"] > 2.0]

return {
"success": True,
"data": {
"timestamp": datetime.utcnow().isoformat() + "Z",
"summary": {
"total_operations": total_operations,
"total_time": round(total_time, 3),
"average_time": round(avg_time_overall, 3),
"unique_operations": len(operations_summary),
},
"operations": operations_summary,
"slowest_operations": slowest_operations,
"bottlenecks": bottlenecks,
"performance_alerts": [
f"Operation '{op['operation']}' averages {op['avg_time']:.3f}s per call"
for op in bottlenecks
],
},
}

except Exception as e:
return {
"success": False,
"error": f"Failed to retrieve performance metrics: {str(e)}",
"data": {
"timestamp": datetime.utcnow().isoformat() + "Z",
"summary": {},
"operations": {},
"slowest_operations": [],
"bottlenecks": [],
"performance_alerts": [],
},
}
4 changes: 4 additions & 0 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from api.health import router as health_router
from api.middleware.cors import setup_cors
from api.projects import router as projects_router
from middleware.monitoring import PerformanceMonitoringMiddleware

# Create FastAPI application
app = FastAPI(
Expand All @@ -25,6 +26,9 @@
# Setup CORS middleware
setup_cors(app)

# Add performance monitoring middleware
app.add_middleware(PerformanceMonitoringMiddleware)

# Include routers
app.include_router(health_router)
app.include_router(auth_router)
Expand Down
241 changes: 241 additions & 0 deletions backend/middleware/monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
import logging
import time
from typing import Dict, List, Optional

try:
from fastapi import Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware

FASTAPI_AVAILABLE = True
except ImportError:
# FastAPI not available in test environment
FASTAPI_AVAILABLE = False
Request = None
Response = None
BaseHTTPMiddleware = None

logger = logging.getLogger(__name__)


if FASTAPI_AVAILABLE:

class PerformanceMonitoringMiddleware(BaseHTTPMiddleware):
"""Middleware to monitor API performance and response times"""

def __init__(self, app, enable_detailed_logging: bool = True):
super().__init__(app)
self.enable_detailed_logging = enable_detailed_logging
self.metrics: Dict[str, List[float]] = {}

async def dispatch(self, request: Request, call_next):
"""Monitor request processing time"""
start_time = time.time()

# Process the request
response = await call_next(request)

# Calculate processing time
process_time = time.time() - start_time

# Track metrics
endpoint = f"{request.method} {request.url.path}"
self._record_metric(endpoint, process_time)

# Add performance header
response.headers["X-Process-Time"] = str(process_time)

# Log performance if enabled
if self.enable_detailed_logging:
self._log_performance(request, response, process_time)

return response

def _record_metric(self, endpoint: str, process_time: float):
"""Record performance metric for endpoint"""
if endpoint not in self.metrics:
self.metrics[endpoint] = []

# Keep only last 100 measurements to prevent memory bloat
if len(self.metrics[endpoint]) >= 100:
self.metrics[endpoint].pop(0)

self.metrics[endpoint].append(process_time)

def _log_performance(
self, request: Request, response: Response, process_time: float
):
"""Log performance information"""
endpoint = f"{request.method} {request.url.path}"
status_code = response.status_code

# Determine log level based on performance and status
if process_time > 5.0: # Very slow requests
log_level = logging.WARNING
performance_indicator = "SLOW"
elif process_time > 2.0: # Moderately slow requests
log_level = logging.INFO
performance_indicator = "MODERATE"
else:
log_level = logging.DEBUG
performance_indicator = "FAST"

# Log with appropriate level
logger.log(
log_level,
f"[{performance_indicator}] {endpoint} - {status_code} - {process_time:.3f}s",
)

# Log additional warning for very slow requests
if process_time > 5.0:
avg_time = self.get_average_response_time(endpoint)
logger.warning(
f"Performance bottleneck detected: {endpoint} took {process_time:.3f}s "
f"(avg: {avg_time:.3f}s)"
)

def get_metrics_summary(self) -> Dict[str, Dict[str, float]]:
"""Get performance metrics summary for all endpoints"""
summary = {}

for endpoint, times in self.metrics.items():
if times:
summary[endpoint] = {
"avg_time": sum(times) / len(times),
"min_time": min(times),
"max_time": max(times),
"request_count": len(times),
"total_time": sum(times),
}

return summary

def get_average_response_time(self, endpoint: str) -> float:
"""Get average response time for specific endpoint"""
if endpoint in self.metrics and self.metrics[endpoint]:
return sum(self.metrics[endpoint]) / len(self.metrics[endpoint])
return 0.0

def get_slowest_endpoints(self, limit: int = 5) -> List[Dict[str, float]]:
"""Get the slowest endpoints by average response time"""
summary = self.get_metrics_summary()

sorted_endpoints = sorted(
summary.items(), key=lambda x: x[1]["avg_time"], reverse=True
)

return [
{
"endpoint": endpoint,
"avg_time": metrics["avg_time"],
"request_count": metrics["request_count"],
}
for endpoint, metrics in sorted_endpoints[:limit]
]

def clear_metrics(self):
"""Clear all collected metrics"""
self.metrics.clear()
logger.info("Performance metrics cleared")

else:
# Stub class when FastAPI is not available
class PerformanceMonitoringMiddleware:
def __init__(self, app, enable_detailed_logging: bool = True):
self.app = app

async def __call__(self, scope, receive, send):
# Pass through to the app without monitoring in test mode
await self.app(scope, receive, send)


class QueryPerformanceTracker:
"""Track performance of specific operations like database queries and AI calls"""

def __init__(self):
self.operation_metrics: Dict[str, List[float]] = {}

def track_operation(self, operation_name: str, duration: float):
"""Track duration of a specific operation"""
if operation_name not in self.operation_metrics:
self.operation_metrics[operation_name] = []

# Keep only last 50 measurements per operation
if len(self.operation_metrics[operation_name]) >= 50:
self.operation_metrics[operation_name].pop(0)

self.operation_metrics[operation_name].append(duration)

# Log slow operations
if duration > 3.0:
avg_duration = sum(self.operation_metrics[operation_name]) / len(
self.operation_metrics[operation_name]
)
logger.warning(
f"Slow operation detected: {operation_name} took {duration:.3f}s "
f"(avg: {avg_duration:.3f}s)"
)

def get_operation_stats(self, operation_name: str) -> Optional[Dict[str, float]]:
"""Get statistics for a specific operation"""
if (
operation_name not in self.operation_metrics
or not self.operation_metrics[operation_name]
):
return None

times = self.operation_metrics[operation_name]
return {
"avg_time": sum(times) / len(times),
"min_time": min(times),
"max_time": max(times),
"call_count": len(times),
"total_time": sum(times),
}

def get_all_operations_summary(self) -> Dict[str, Dict[str, float]]:
"""Get summary of all tracked operations"""
summary = {}
for operation_name in self.operation_metrics:
stats = self.get_operation_stats(operation_name)
if stats:
summary[operation_name] = stats
return summary


# Global tracker instance for operation monitoring
query_performance_tracker = QueryPerformanceTracker()


def track_performance(operation_name: str):
"""Decorator to track performance of functions"""

def decorator(func):
import asyncio
import inspect

if inspect.iscoroutinefunction(func):
# Async function wrapper
async def async_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
query_performance_tracker.track_operation(operation_name, duration)

return async_wrapper
else:
# Sync function wrapper
def sync_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
query_performance_tracker.track_operation(operation_name, duration)

return sync_wrapper

return decorator
2 changes: 2 additions & 0 deletions backend/services/database_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sqlalchemy.orm import sessionmaker

from models.base import Base
from middleware.monitoring import track_performance

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,6 +45,7 @@ def reconnect(self):
"""Force a reconnection to the database."""
self.connect()

@track_performance("database_health_check")
def health_check(self) -> Dict[str, Any]:
"""Check database health"""
try:
Expand Down
Loading