Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions backend/api/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
)
from services.project_service import get_project_service
from services.storage_service import storage_service
from tasks.file_processing import process_csv_file

router = APIRouter(prefix="/projects", tags=["projects"])
project_service = get_project_service()
Expand Down Expand Up @@ -277,6 +278,55 @@ async def get_upload_url(
)


@router.post("/{project_id}/process")
async def trigger_file_processing(
project_id: str, user_id: str = Depends(verify_token)
) -> ApiResponse[Dict[str, str]]:
"""Trigger CSV file processing for a project"""

try:
user_uuid = uuid.UUID(user_id)
project_uuid = uuid.UUID(project_id)

# Check if project exists and user owns it
if not project_service.check_project_ownership(project_uuid, user_uuid):
raise HTTPException(status_code=404, detail="Project not found")

# Get project to check current status
project_db = project_service.get_project_by_id(project_uuid)
if project_db.status == "ready":
raise HTTPException(status_code=400, detail="Project already processed")

# Check if file exists in storage
object_name = f"{user_id}/{project_id}/data.csv"
if not storage_service.file_exists(object_name):
raise HTTPException(
status_code=400, detail="No file uploaded for processing"
)

# Trigger Celery task
task = process_csv_file.delay(project_id, user_id)

return ApiResponse(
success=True,
data={
"message": "File processing started",
"task_id": task.id,
"project_id": project_id,
},
)

except ValueError as e:
raise HTTPException(status_code=400, detail=f"Invalid project ID: {str(e)}")
except HTTPException:
# Re-raise HTTPExceptions without wrapping them
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start file processing: {str(e)}"
)


@router.get("/{project_id}/status")
async def get_project_status(
project_id: str, user_id: str = Depends(verify_token)
Expand Down
5 changes: 2 additions & 3 deletions backend/services/project_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,19 @@ def update_project_status(
def update_project_metadata(
self,
project_id: uuid.UUID,
csv_filename: str,
row_count: int,
column_count: int,
columns_metadata: list,
status: ProjectStatusEnum = ProjectStatusEnum.READY,
) -> ProjectInDB:
"""Update project metadata after file processing"""
return self.update_project(
project_id,
ProjectUpdate(
csv_filename=csv_filename,
row_count=row_count,
column_count=column_count,
columns_metadata=columns_metadata,
status=ProjectStatusEnum.READY,
status=status,
),
)

Expand Down
115 changes: 97 additions & 18 deletions backend/tasks/file_processing.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,142 @@
import logging
import os
import uuid
from io import StringIO
from typing import Any, Dict, List, Optional

import pandas as pd
from celery import current_task

from celery_app import celery_app
from services.database_service import get_db_service
from services.project_service import get_project_service
from services.storage_service import storage_service

logger = logging.getLogger(__name__)


@celery_app.task(bind=True)
def process_csv_file(self, project_id: str, file_path: str):
def process_csv_file(self, project_id: str, user_id: str):
"""
Process uploaded CSV file - placeholder implementation for Task B2
Will be fully implemented in Task B12
Process uploaded CSV file for project analysis
"""
try:
project_uuid = uuid.UUID(project_id)
user_uuid = uuid.UUID(user_id)

# Update task state
self.update_state(
state="PROGRESS",
meta={"current": 10, "total": 100, "status": "Starting CSV analysis..."},
)

logger.info(f"Processing CSV file for project {project_id}: {file_path}")
logger.info(f"Processing CSV file for project {project_id}")

# Simulate processing steps
import time
# Get project service
project_service = get_project_service()

# Update project status to processing
project_service.update_project_status(project_uuid, "processing")

# Download file from MinIO
self.update_state(
state="PROGRESS",
meta={"current": 20, "total": 100, "status": "Downloading file..."},
)

time.sleep(2)
object_name = f"{user_id}/{project_id}/data.csv"
file_content = storage_service.download_file(object_name)

if not file_content:
raise Exception("Failed to download file from storage")

# Parse CSV with pandas
self.update_state(
state="PROGRESS",
meta={"current": 50, "total": 100, "status": "Analyzing schema..."},
meta={"current": 40, "total": 100, "status": "Parsing CSV..."},
)

time.sleep(2)
try:
df = pd.read_csv(StringIO(file_content.decode("utf-8")))
except Exception as e:
raise Exception(f"Failed to parse CSV: {str(e)}")

# Analyze schema
self.update_state(
state="PROGRESS",
meta={"current": 90, "total": 100, "status": "Finalizing..."},
meta={"current": 60, "total": 100, "status": "Analyzing schema..."},
)

columns_metadata = []
for column in df.columns:
col_type = str(df[column].dtype)

# Determine data type category
if "int" in col_type or "float" in col_type:
data_type = "number"
elif "datetime" in col_type:
data_type = "datetime"
elif "bool" in col_type:
data_type = "boolean"
else:
data_type = "string"

# Check for null values
nullable = df[column].isnull().any()

# Get sample values (first 5 non-null values)
sample_values = df[column].dropna().head(5).tolist()

columns_metadata.append(
{
"name": column,
"type": data_type,
"nullable": nullable,
"sample_values": sample_values,
}
)

# Update project with analysis results
self.update_state(
state="PROGRESS",
meta={"current": 80, "total": 100, "status": "Updating project..."},
)

project_service.update_project_metadata(
project_uuid,
row_count=len(df),
column_count=len(df.columns),
columns_metadata=columns_metadata,
status="ready",
)

# Final update
self.update_state(
state="PROGRESS",
meta={"current": 100, "total": 100, "status": "Processing complete"},
)

# Mock result
result = {
"project_id": project_id,
"status": "completed",
"row_count": 1000,
"column_count": 10,
"columns_metadata": [
{"name": "id", "type": "number", "nullable": False},
{"name": "name", "type": "string", "nullable": False},
{"name": "email", "type": "string", "nullable": True},
],
"row_count": len(df),
"column_count": len(df.columns),
"columns_metadata": columns_metadata,
}

logger.info(f"Successfully processed CSV for project {project_id}")
return result

except Exception as exc:
logger.error(f"Error processing CSV for project {project_id}: {str(exc)}")

# Update project status to error
try:
project_service = get_project_service()
project_service.update_project_status(project_uuid, "error")
except:
pass

self.update_state(
state="FAILURE", meta={"error": str(exc), "project_id": project_id}
)
Expand Down
136 changes: 136 additions & 0 deletions backend/tests/test_file_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from io import StringIO
from unittest.mock import Mock, patch

import pandas as pd
import pytest

from tasks.file_processing import process_csv_file


class TestFileProcessing:
"""Test Celery file processing tasks"""

@patch("tasks.file_processing.storage_service")
@patch("tasks.file_processing.get_project_service")
@patch.object(process_csv_file, "update_state", autospec=True)
def test_process_csv_file_success(
self, mock_update_state, mock_project_service, mock_storage_service
):
"""Test successful CSV file processing"""
# Mock project service
mock_service = Mock()
mock_project_service.return_value = mock_service

# Mock storage service
csv_content = """id,name,email,age
1,John Doe,john@example.com,30
2,Jane Smith,jane@example.com,25
3,Bob Johnson,bob@example.com,35"""

mock_storage_service.download_file.return_value = csv_content.encode("utf-8")

# Call the task function directly (not through Celery wrapper)
result = process_csv_file.run(
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
)

# Verify storage service was called
mock_storage_service.download_file.assert_called_once_with(
"00000000-0000-0000-0000-000000000002/00000000-0000-0000-0000-000000000001/data.csv"
)

# Verify project service was called
mock_service.update_project_status.assert_called()
mock_service.update_project_metadata.assert_called()

# Verify result
assert result["project_id"] == "00000000-0000-0000-0000-000000000001"
assert result["status"] == "completed"
assert result["row_count"] == 3
assert result["column_count"] == 4

# Verify column metadata
columns_metadata = result["columns_metadata"]
assert len(columns_metadata) == 4

# Check that columns are detected
column_names = [col["name"] for col in columns_metadata]
assert "id" in column_names
assert "name" in column_names
assert "email" in column_names
assert "age" in column_names

@patch("tasks.file_processing.storage_service")
@patch("tasks.file_processing.get_project_service")
@patch.object(process_csv_file, "update_state", autospec=True)
def test_process_csv_file_download_failure(
self, mock_update_state, mock_project_service, mock_storage_service
):
"""Test CSV processing when file download fails"""
# Mock project service
mock_service = Mock()
mock_project_service.return_value = mock_service

# Mock storage service to return None (download failure)
mock_storage_service.download_file.return_value = None

# Call the task and expect exception
with pytest.raises(Exception, match="Failed to download file from storage"):
process_csv_file.run(
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
)

@patch("tasks.file_processing.storage_service")
@patch("tasks.file_processing.get_project_service")
@patch.object(process_csv_file, "update_state", autospec=True)
@patch("tasks.file_processing.pd.read_csv")
def test_process_csv_file_parse_failure(
self,
mock_pandas_read_csv,
mock_update_state,
mock_project_service,
mock_storage_service,
):
"""Test CSV processing when file parsing fails"""
# Mock project service
mock_service = Mock()
mock_project_service.return_value = mock_service

# Mock storage service to return valid content
mock_storage_service.download_file.return_value = b"valid,csv,content"

# Mock pandas to raise an exception
mock_pandas_read_csv.side_effect = Exception("CSV parsing failed")

# Call the task and expect exception
with pytest.raises(Exception, match="Failed to parse CSV"):
process_csv_file.run(
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
)

@patch("tasks.file_processing.storage_service")
@patch("tasks.file_processing.get_project_service")
@patch.object(process_csv_file, "update_state", autospec=True)
def test_process_csv_file_error_handling(
self, mock_update_state, mock_project_service, mock_storage_service
):
"""Test error handling in CSV processing"""
# Mock project service to raise exception
mock_service = Mock()
mock_service.update_project_status.side_effect = Exception("Database error")
mock_project_service.return_value = mock_service

# Mock storage service
csv_content = """id,name
1,John Doe"""
mock_storage_service.download_file.return_value = csv_content.encode("utf-8")

# Call the task and expect exception
with pytest.raises(Exception, match="Database error"):
process_csv_file.run(
"00000000-0000-0000-0000-000000000001",
"00000000-0000-0000-0000-000000000002",
)
Loading