diff --git a/backend/api/projects.py b/backend/api/projects.py index 5e86385..8a9cc8c 100644 --- a/backend/api/projects.py +++ b/backend/api/projects.py @@ -19,6 +19,7 @@ ) from services.project_service import get_project_service from services.storage_service import storage_service +from tasks.file_processing import process_csv_file router = APIRouter(prefix="/projects", tags=["projects"]) project_service = get_project_service() @@ -277,6 +278,55 @@ async def get_upload_url( ) +@router.post("/{project_id}/process") +async def trigger_file_processing( + project_id: str, user_id: str = Depends(verify_token) +) -> ApiResponse[Dict[str, str]]: + """Trigger CSV file processing for a project""" + + try: + user_uuid = uuid.UUID(user_id) + project_uuid = uuid.UUID(project_id) + + # Check if project exists and user owns it + if not project_service.check_project_ownership(project_uuid, user_uuid): + raise HTTPException(status_code=404, detail="Project not found") + + # Get project to check current status + project_db = project_service.get_project_by_id(project_uuid) + if project_db.status == "ready": + raise HTTPException(status_code=400, detail="Project already processed") + + # Check if file exists in storage + object_name = f"{user_id}/{project_id}/data.csv" + if not storage_service.file_exists(object_name): + raise HTTPException( + status_code=400, detail="No file uploaded for processing" + ) + + # Trigger Celery task + task = process_csv_file.delay(project_id, user_id) + + return ApiResponse( + success=True, + data={ + "message": "File processing started", + "task_id": task.id, + "project_id": project_id, + }, + ) + + except ValueError as e: + raise HTTPException(status_code=400, detail=f"Invalid project ID: {str(e)}") + except HTTPException: + # Re-raise HTTPExceptions without wrapping them + raise + except Exception as e: + raise HTTPException( + status_code=500, detail=f"Failed to start file processing: {str(e)}" + ) + + @router.get("/{project_id}/status") async def get_project_status( project_id: str, user_id: str = Depends(verify_token) diff --git a/backend/services/project_service.py b/backend/services/project_service.py index a7d5552..2e92fa3 100644 --- a/backend/services/project_service.py +++ b/backend/services/project_service.py @@ -133,20 +133,19 @@ def update_project_status( def update_project_metadata( self, project_id: uuid.UUID, - csv_filename: str, row_count: int, column_count: int, columns_metadata: list, + status: ProjectStatusEnum = ProjectStatusEnum.READY, ) -> ProjectInDB: """Update project metadata after file processing""" return self.update_project( project_id, ProjectUpdate( - csv_filename=csv_filename, row_count=row_count, column_count=column_count, columns_metadata=columns_metadata, - status=ProjectStatusEnum.READY, + status=status, ), ) diff --git a/backend/tasks/file_processing.py b/backend/tasks/file_processing.py index 11665ee..dab2af6 100644 --- a/backend/tasks/file_processing.py +++ b/backend/tasks/file_processing.py @@ -1,56 +1,127 @@ import logging import os +import uuid +from io import StringIO +from typing import Any, Dict, List, Optional +import pandas as pd from celery import current_task from celery_app import celery_app +from services.database_service import get_db_service +from services.project_service import get_project_service +from services.storage_service import storage_service logger = logging.getLogger(__name__) @celery_app.task(bind=True) -def process_csv_file(self, project_id: str, file_path: str): +def process_csv_file(self, project_id: str, user_id: str): """ - Process uploaded CSV file - placeholder implementation for Task B2 - Will be fully implemented in Task B12 + Process uploaded CSV file for project analysis """ try: + project_uuid = uuid.UUID(project_id) + user_uuid = uuid.UUID(user_id) + # Update task state self.update_state( state="PROGRESS", meta={"current": 10, "total": 100, "status": "Starting CSV analysis..."}, ) - logger.info(f"Processing CSV file for project {project_id}: {file_path}") + logger.info(f"Processing CSV file for project {project_id}") - # Simulate processing steps - import time + # Get project service + project_service = get_project_service() + + # Update project status to processing + project_service.update_project_status(project_uuid, "processing") + + # Download file from MinIO + self.update_state( + state="PROGRESS", + meta={"current": 20, "total": 100, "status": "Downloading file..."}, + ) - time.sleep(2) + object_name = f"{user_id}/{project_id}/data.csv" + file_content = storage_service.download_file(object_name) + if not file_content: + raise Exception("Failed to download file from storage") + + # Parse CSV with pandas self.update_state( state="PROGRESS", - meta={"current": 50, "total": 100, "status": "Analyzing schema..."}, + meta={"current": 40, "total": 100, "status": "Parsing CSV..."}, ) - time.sleep(2) + try: + df = pd.read_csv(StringIO(file_content.decode("utf-8"))) + except Exception as e: + raise Exception(f"Failed to parse CSV: {str(e)}") + # Analyze schema self.update_state( state="PROGRESS", - meta={"current": 90, "total": 100, "status": "Finalizing..."}, + meta={"current": 60, "total": 100, "status": "Analyzing schema..."}, + ) + + columns_metadata = [] + for column in df.columns: + col_type = str(df[column].dtype) + + # Determine data type category + if "int" in col_type or "float" in col_type: + data_type = "number" + elif "datetime" in col_type: + data_type = "datetime" + elif "bool" in col_type: + data_type = "boolean" + else: + data_type = "string" + + # Check for null values + nullable = df[column].isnull().any() + + # Get sample values (first 5 non-null values) + sample_values = df[column].dropna().head(5).tolist() + + columns_metadata.append( + { + "name": column, + "type": data_type, + "nullable": nullable, + "sample_values": sample_values, + } + ) + + # Update project with analysis results + self.update_state( + state="PROGRESS", + meta={"current": 80, "total": 100, "status": "Updating project..."}, + ) + + project_service.update_project_metadata( + project_uuid, + row_count=len(df), + column_count=len(df.columns), + columns_metadata=columns_metadata, + status="ready", + ) + + # Final update + self.update_state( + state="PROGRESS", + meta={"current": 100, "total": 100, "status": "Processing complete"}, ) - # Mock result result = { "project_id": project_id, "status": "completed", - "row_count": 1000, - "column_count": 10, - "columns_metadata": [ - {"name": "id", "type": "number", "nullable": False}, - {"name": "name", "type": "string", "nullable": False}, - {"name": "email", "type": "string", "nullable": True}, - ], + "row_count": len(df), + "column_count": len(df.columns), + "columns_metadata": columns_metadata, } logger.info(f"Successfully processed CSV for project {project_id}") @@ -58,6 +129,14 @@ def process_csv_file(self, project_id: str, file_path: str): except Exception as exc: logger.error(f"Error processing CSV for project {project_id}: {str(exc)}") + + # Update project status to error + try: + project_service = get_project_service() + project_service.update_project_status(project_uuid, "error") + except: + pass + self.update_state( state="FAILURE", meta={"error": str(exc), "project_id": project_id} ) diff --git a/backend/tests/test_file_processing.py b/backend/tests/test_file_processing.py new file mode 100644 index 0000000..1da67ef --- /dev/null +++ b/backend/tests/test_file_processing.py @@ -0,0 +1,136 @@ +from io import StringIO +from unittest.mock import Mock, patch + +import pandas as pd +import pytest + +from tasks.file_processing import process_csv_file + + +class TestFileProcessing: + """Test Celery file processing tasks""" + + @patch("tasks.file_processing.storage_service") + @patch("tasks.file_processing.get_project_service") + @patch.object(process_csv_file, "update_state", autospec=True) + def test_process_csv_file_success( + self, mock_update_state, mock_project_service, mock_storage_service + ): + """Test successful CSV file processing""" + # Mock project service + mock_service = Mock() + mock_project_service.return_value = mock_service + + # Mock storage service + csv_content = """id,name,email,age +1,John Doe,john@example.com,30 +2,Jane Smith,jane@example.com,25 +3,Bob Johnson,bob@example.com,35""" + + mock_storage_service.download_file.return_value = csv_content.encode("utf-8") + + # Call the task function directly (not through Celery wrapper) + result = process_csv_file.run( + "00000000-0000-0000-0000-000000000001", + "00000000-0000-0000-0000-000000000002", + ) + + # Verify storage service was called + mock_storage_service.download_file.assert_called_once_with( + "00000000-0000-0000-0000-000000000002/00000000-0000-0000-0000-000000000001/data.csv" + ) + + # Verify project service was called + mock_service.update_project_status.assert_called() + mock_service.update_project_metadata.assert_called() + + # Verify result + assert result["project_id"] == "00000000-0000-0000-0000-000000000001" + assert result["status"] == "completed" + assert result["row_count"] == 3 + assert result["column_count"] == 4 + + # Verify column metadata + columns_metadata = result["columns_metadata"] + assert len(columns_metadata) == 4 + + # Check that columns are detected + column_names = [col["name"] for col in columns_metadata] + assert "id" in column_names + assert "name" in column_names + assert "email" in column_names + assert "age" in column_names + + @patch("tasks.file_processing.storage_service") + @patch("tasks.file_processing.get_project_service") + @patch.object(process_csv_file, "update_state", autospec=True) + def test_process_csv_file_download_failure( + self, mock_update_state, mock_project_service, mock_storage_service + ): + """Test CSV processing when file download fails""" + # Mock project service + mock_service = Mock() + mock_project_service.return_value = mock_service + + # Mock storage service to return None (download failure) + mock_storage_service.download_file.return_value = None + + # Call the task and expect exception + with pytest.raises(Exception, match="Failed to download file from storage"): + process_csv_file.run( + "00000000-0000-0000-0000-000000000001", + "00000000-0000-0000-0000-000000000002", + ) + + @patch("tasks.file_processing.storage_service") + @patch("tasks.file_processing.get_project_service") + @patch.object(process_csv_file, "update_state", autospec=True) + @patch("tasks.file_processing.pd.read_csv") + def test_process_csv_file_parse_failure( + self, + mock_pandas_read_csv, + mock_update_state, + mock_project_service, + mock_storage_service, + ): + """Test CSV processing when file parsing fails""" + # Mock project service + mock_service = Mock() + mock_project_service.return_value = mock_service + + # Mock storage service to return valid content + mock_storage_service.download_file.return_value = b"valid,csv,content" + + # Mock pandas to raise an exception + mock_pandas_read_csv.side_effect = Exception("CSV parsing failed") + + # Call the task and expect exception + with pytest.raises(Exception, match="Failed to parse CSV"): + process_csv_file.run( + "00000000-0000-0000-0000-000000000001", + "00000000-0000-0000-0000-000000000002", + ) + + @patch("tasks.file_processing.storage_service") + @patch("tasks.file_processing.get_project_service") + @patch.object(process_csv_file, "update_state", autospec=True) + def test_process_csv_file_error_handling( + self, mock_update_state, mock_project_service, mock_storage_service + ): + """Test error handling in CSV processing""" + # Mock project service to raise exception + mock_service = Mock() + mock_service.update_project_status.side_effect = Exception("Database error") + mock_project_service.return_value = mock_service + + # Mock storage service + csv_content = """id,name +1,John Doe""" + mock_storage_service.download_file.return_value = csv_content.encode("utf-8") + + # Call the task and expect exception + with pytest.raises(Exception, match="Database error"): + process_csv_file.run( + "00000000-0000-0000-0000-000000000001", + "00000000-0000-0000-0000-000000000002", + )