Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion backend/api/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from services.project_service import get_project_service
from services.storage_service import storage_service
from tasks.file_processing import process_csv_file
from tasks.file_processing import analyze_csv_schema, process_csv_file

router = APIRouter(prefix="/projects", tags=["projects"])
project_service = get_project_service()
Expand Down Expand Up @@ -327,6 +327,55 @@ async def trigger_file_processing(
)


@router.post("/{project_id}/analyze-schema")
async def trigger_schema_analysis(
project_id: str, user_id: str = Depends(verify_token)
) -> ApiResponse[Dict[str, str]]:
"""Trigger standalone schema analysis for a project"""

try:
user_uuid = uuid.UUID(user_id)
project_uuid = uuid.UUID(project_id)

# Check if project exists and user owns it
if not project_service.check_project_ownership(project_uuid, user_uuid):
raise HTTPException(status_code=404, detail="Project not found")

# Check if file exists in storage
object_name = f"{user_id}/{project_id}/data.csv"
if not storage_service.file_exists(object_name):
raise HTTPException(status_code=400, detail="No file uploaded for analysis")

# Download file content
file_content = storage_service.download_file(object_name)
if not file_content:
raise HTTPException(
status_code=400, detail="Failed to download file for analysis"
)

# Trigger standalone schema analysis task
task = analyze_csv_schema.delay(file_content, f"project_{project_id}_data.csv")

return ApiResponse(
success=True,
data={
"message": "Schema analysis started",
"task_id": task.id,
"project_id": project_id,
},
)

except ValueError as e:
raise HTTPException(status_code=400, detail=f"Invalid project ID: {str(e)}")
except HTTPException:
# Re-raise HTTPExceptions without wrapping them
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to start schema analysis: {str(e)}"
)


@router.get("/{project_id}/status")
async def get_project_status(
project_id: str, user_id: str = Depends(verify_token)
Expand Down
239 changes: 204 additions & 35 deletions backend/tasks/file_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def process_csv_file(self, project_id: str, user_id: str):
columns_metadata = []
for column in df.columns:
col_type = str(df[column].dtype)
col_series = df[column]

# Determine data type category
if "int" in col_type or "float" in col_type:
Expand All @@ -82,20 +83,88 @@ def process_csv_file(self, project_id: str, user_id: str):
data_type = "string"

# Check for null values
nullable = df[column].isnull().any()
nullable = col_series.isnull().any()
null_count = col_series.isnull().sum()
null_percentage = (null_count / len(col_series)) * 100

# Get sample values (first 5 non-null values)
sample_values = df[column].dropna().head(5).tolist()
sample_values = col_series.dropna().head(5).tolist()

# Calculate statistics for numeric columns
statistics = {}
if data_type == "number":
statistics = {
"min": float(col_series.min()) if not col_series.empty else None,
"max": float(col_series.max()) if not col_series.empty else None,
"mean": float(col_series.mean()) if not col_series.empty else None,
"median": (
float(col_series.median()) if not col_series.empty else None
),
"std": float(col_series.std()) if not col_series.empty else None,
}
elif data_type == "string":
# String statistics
unique_count = col_series.nunique()
most_common = col_series.mode().tolist() if not col_series.empty else []
avg_length = col_series.str.len().mean() if not col_series.empty else 0
statistics = {
"unique_count": int(unique_count),
"most_common_values": most_common[:3], # Top 3 most common
"average_length": (
float(avg_length) if not pd.isna(avg_length) else 0
),
}

# Detect potential data quality issues
data_quality_issues = []
if null_percentage > 50:
data_quality_issues.append("high_null_percentage")
if data_type == "string" and col_series.nunique() == 1:
data_quality_issues.append("single_value_column")
if data_type == "number" and col_series.std() == 0:
data_quality_issues.append("no_variance")

columns_metadata.append(
{
"name": column,
"type": data_type,
"nullable": nullable,
"null_count": int(null_count),
"null_percentage": round(null_percentage, 2),
"sample_values": sample_values,
"statistics": statistics,
"data_quality_issues": data_quality_issues,
}
)

# Calculate dataset-level insights
dataset_insights = {
"total_rows": len(df),
"total_columns": len(df.columns),
"total_cells": len(df) * len(df.columns),
"null_cells": df.isnull().sum().sum(),
"null_percentage": round(
(df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100, 2
),
"duplicate_rows": int(df.duplicated().sum()),
"duplicate_percentage": round((df.duplicated().sum() / len(df)) * 100, 2),
"numeric_columns": len(
[col for col in columns_metadata if col["type"] == "number"]
),
"string_columns": len(
[col for col in columns_metadata if col["type"] == "string"]
),
"datetime_columns": len(
[col for col in columns_metadata if col["type"] == "datetime"]
),
"boolean_columns": len(
[col for col in columns_metadata if col["type"] == "boolean"]
),
"columns_with_issues": len(
[col for col in columns_metadata if col["data_quality_issues"]]
),
}

# Update project with analysis results
self.update_state(
state="PROGRESS",
Expand All @@ -122,6 +191,7 @@ def process_csv_file(self, project_id: str, user_id: str):
"row_count": len(df),
"column_count": len(df.columns),
"columns_metadata": columns_metadata,
"dataset_insights": dataset_insights,
}

logger.info(f"Successfully processed CSV for project {project_id}")
Expand All @@ -144,48 +214,147 @@ def process_csv_file(self, project_id: str, user_id: str):


@celery_app.task(bind=True)
def analyze_csv_schema(self, file_path: str):
def analyze_csv_schema(self, file_content: bytes, filename: str = "data.csv"):
"""
Analyze CSV schema - placeholder implementation for Task B2
Will be fully implemented in Task B13
Analyze CSV schema independently - enhanced implementation for Task B13
"""
try:
logger.info(f"Analyzing CSV schema: {file_path}")
logger.info(f"Analyzing CSV schema for file: {filename}")

# Simulate schema analysis
import time
# Update task state
self.update_state(
state="PROGRESS",
meta={"current": 20, "total": 100, "status": "Parsing CSV..."},
)

time.sleep(1)
# Parse CSV with pandas
try:
df = pd.read_csv(StringIO(file_content.decode("utf-8")))
except Exception as e:
raise Exception(f"Failed to parse CSV: {str(e)}")

# Mock schema result
schema = {
"columns": [
{
"name": "id",
"type": "integer",
"nullable": False,
"sample_values": [1, 2, 3],
},
{
"name": "name",
"type": "string",
"nullable": False,
"sample_values": ["John", "Jane", "Bob"],
},
# Update task state
self.update_state(
state="PROGRESS",
meta={"current": 60, "total": 100, "status": "Analyzing schema..."},
)

# Analyze columns
columns_metadata = []
for column in df.columns:
col_type = str(df[column].dtype)
col_series = df[column]

# Determine data type category
if "int" in col_type or "float" in col_type:
data_type = "number"
elif "datetime" in col_type:
data_type = "datetime"
elif "bool" in col_type:
data_type = "boolean"
else:
data_type = "string"

# Check for null values
nullable = col_series.isnull().any()
null_count = col_series.isnull().sum()
null_percentage = (null_count / len(col_series)) * 100

# Get sample values (first 5 non-null values)
sample_values = col_series.dropna().head(5).tolist()

# Calculate statistics for numeric columns
statistics = {}
if data_type == "number":
statistics = {
"min": float(col_series.min()) if not col_series.empty else None,
"max": float(col_series.max()) if not col_series.empty else None,
"mean": float(col_series.mean()) if not col_series.empty else None,
"median": (
float(col_series.median()) if not col_series.empty else None
),
"std": float(col_series.std()) if not col_series.empty else None,
}
elif data_type == "string":
# String statistics
unique_count = col_series.nunique()
most_common = col_series.mode().tolist() if not col_series.empty else []
avg_length = col_series.str.len().mean() if not col_series.empty else 0
statistics = {
"unique_count": int(unique_count),
"most_common_values": most_common[:3], # Top 3 most common
"average_length": (
float(avg_length) if not pd.isna(avg_length) else 0
),
}

# Detect potential data quality issues
data_quality_issues = []
if null_percentage > 50:
data_quality_issues.append("high_null_percentage")
if data_type == "string" and col_series.nunique() == 1:
data_quality_issues.append("single_value_column")
if data_type == "number" and col_series.std() == 0:
data_quality_issues.append("no_variance")

columns_metadata.append(
{
"name": "age",
"type": "integer",
"nullable": True,
"sample_values": [25, 30, None],
},
],
"row_count": 1000,
"file_size": "2.5 MB",
"name": column,
"type": data_type,
"nullable": nullable,
"null_count": int(null_count),
"null_percentage": round(null_percentage, 2),
"sample_values": sample_values,
"statistics": statistics,
"data_quality_issues": data_quality_issues,
}
)

# Calculate dataset-level insights
dataset_insights = {
"total_rows": len(df),
"total_columns": len(df.columns),
"total_cells": len(df) * len(df.columns),
"null_cells": df.isnull().sum().sum(),
"null_percentage": round(
(df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100, 2
),
"duplicate_rows": int(df.duplicated().sum()),
"duplicate_percentage": round((df.duplicated().sum() / len(df)) * 100, 2),
"numeric_columns": len(
[col for col in columns_metadata if col["type"] == "number"]
),
"string_columns": len(
[col for col in columns_metadata if col["type"] == "string"]
),
"datetime_columns": len(
[col for col in columns_metadata if col["type"] == "datetime"]
),
"boolean_columns": len(
[col for col in columns_metadata if col["type"] == "boolean"]
),
"columns_with_issues": len(
[col for col in columns_metadata if col["data_quality_issues"]]
),
}

# Update task state
self.update_state(
state="PROGRESS",
meta={"current": 100, "total": 100, "status": "Analysis complete"},
)

schema_result = {
"filename": filename,
"file_size_bytes": len(file_content),
"columns": columns_metadata,
"dataset_insights": dataset_insights,
"analysis_timestamp": pd.Timestamp.now().isoformat(),
}

logger.info(f"Successfully analyzed schema for {file_path}")
return schema
logger.info(f"Successfully analyzed schema for {filename}")
return schema_result

except Exception as exc:
logger.error(f"Error analyzing schema for {file_path}: {str(exc)}")
logger.error(f"Error analyzing schema for {filename}: {str(exc)}")
raise exc
Binary file modified backend/test.db
Binary file not shown.
Loading
Loading