From c914737c41fe75ea2a27a7f18f5a3a93e822afa6 Mon Sep 17 00:00:00 2001 From: Astrasv Date: Mon, 2 Feb 2026 20:21:08 +0530 Subject: [PATCH] Feature: Communication enabled with VolPE integrator service --- ARCHITECTURE.md | 89 --------------- README.md | 67 ++++++++++- controllers/problem_controller.go | 93 +++++++++++++++ docker-compose.yaml | 3 +- modules/problem_module.go | 182 +++++++++++++++++++++++++++++- pkg/middleware/log_middleware.go | 6 + routes/api.go | 20 ++-- 7 files changed, 357 insertions(+), 103 deletions(-) delete mode 100644 ARCHITECTURE.md diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md deleted file mode 100644 index fdc88ca..0000000 --- a/ARCHITECTURE.md +++ /dev/null @@ -1,89 +0,0 @@ -# Architecture and Development Guide - -This document outlines the architectural patterns, conventions, and development guidelines for the `controller_microservice_v2` project. Its purpose is to ensure that new code contributions, whether from human developers or AI agents, are consistent with the existing design. - -## 1. Core Architecture: Layered Approach - -The service follows a classic layered architecture to ensure a clean separation of concerns. The flow of a request is as follows: - -**`main.go` -> `routes` -> `controllers` -> `modules` -> `repository` -> `db`** - -- **`cmd/main.go`**: The application entry point. It is responsible for initializing all dependencies (Logger, Database Pool, Clients) and injecting them into the layers that require them. -- **`routes`**: Defines the API endpoints, mapping HTTP methods and URL patterns to specific handler functions in the `controllers`. -- **`controllers`**: The HTTP layer. Handlers in this layer are responsible for parsing and validating requests, calling the appropriate business logic in the `modules` layer, and formatting the response (e.g., writing a JSON payload and an HTTP status code). **Controllers should not contain business logic.** -- **`modules`**: The business logic layer. This layer orchestrates tasks by calling data access functions and other services. It is completely unaware of the HTTP layer. -- **`repository` (To Be Implemented):** The data access layer. This layer abstracts the database. It provides interfaces for data operations (e.g., `CreateNotebook`, `GetNotebookByID`). The `modules` layer should depend on these interfaces, not on a concrete database implementation. -- **`db`**: The database implementation layer. It manages the database connection (`pgxpool`) and contains the concrete implementation of the repository interfaces, including the raw SQL queries. - -## 2. API Design and Routing - -- **Versioning**: All API endpoints are versioned under `/api/v1/`. -- **RESTful Principles**: The API should adhere to RESTful principles, using appropriate HTTP methods (`GET`, `POST`, `PUT`, `DELETE`) for corresponding actions. -- **Routing**: We use the standard library's `http.ServeMux` (Go 1.22+). Routes must be registered explicitly with both the HTTP method and the path pattern in `routes/api.go`. - - ```go - // Good: - mux.HandleFunc("POST /api/v1/notebooks", notebookController.CreateNotebookHandler) - mux.HandleFunc("GET /api/v1/notebooks/{id}", notebookController.GetNotebookByIDHandler) - - // Bad (Avoid): - // mux.HandleFunc("/api/v1/notebooks/", monolithicHandler) - ``` - -## 3. Database and Data Access - -- **Schema**: The single source of truth for the database schema is `db/schema.sql`. -- **Repository Pattern**: All database access from the business logic layer (`modules`) **must** go through a repository interface. This decouples the business logic from the database, making the code easier to test and maintain. Do not use `db.Pool` directly within the `modules` package. - - **Example:** - ```go - // 1. Define the interface (e.g., in a new `storage` or `repository` package) - type NotebookRepository interface { - GetByID(ctx context.Context, id string) (*models.Notebook, error) - } - - // 2. The module depends on the interface - type NotebookModule struct { - Repo NotebookRepository - } - - // 3. The implementation with SQL lives in the db/repository layer - type PostgresNotebookRepo struct { - DB *pgxpool.Pool - } - - func (p *PostgresNotebookRepo) GetByID(ctx context.Context, id string) (*models.Notebook, error) { - // SQL query logic goes here... - } - ``` - -## 4. Logging - -- **Library**: We use `zerolog` for structured, high-performance logging. -- **No `fmt.Print*`**: The use of `fmt.Println`, `fmt.Printf`, or `log.Print*` is strictly forbidden in the application code. This is enforced by a pre-commit hook. -- **Dependency Injection**: The `zerolog.Logger` instance is initialized in `main.go` and passed as a dependency to any struct that needs to log messages. Always use this injected logger instance. - - ```go - // In a controller method: - c.Logger.Info().Str("kernel_id", kernel.ID).Msg("Kernel started successfully") - c.Logger.Error().Err(err).Msg("Failed to retrieve kernel list") - ``` - -## 5. Configuration - -- **Environment Variables**: All configuration (database URLs, auth tokens, ports) **must** be supplied via environment variables. -- **No Hardcoded Values**: Do not hardcode configuration values in the source code. In development, these are loaded from the `.env` file by `godotenv` in `main.go`. - -## 6. Directory Structure Overview - -- **`/cmd`**: Main application entry point. -- **`/db`**: Database schema, connection, and repository implementations. -- **`/routes`**: API route definitions. -- **`/controllers`**: HTTP request/response handlers. -- **`/modules`**: Business logic layer. -- **`/pkg`**: Shared libraries and utilities safe for external use. - - **`/pkg/models`**: Core data structures (structs). - - **`/pkg/jupyter_client`**: A dedicated client for the Jupyter Gateway service. - - **`/pkg/culler`**: Background process for cleaning up idle kernels. -- **`/scripts`**: Helper scripts for development and CI. -- **`/docker`**: Dockerfiles for containerizing services. diff --git a/README.md b/README.md index c3f1211..199341f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,65 @@ -# controller_microservice_v2 -Source code of controller microservice that handles data to and from frontend and jupyter kernel gateway for evolutionary algorithms on click v2 +# Controller Microservice V2 + +Source code of controller microservice that handles data to and from frontend and jupyter kernel gateway for evolutionary algorithms on click v2. + +## Overview + +This service acts as the central orchestrator for the platform, managing: +- [User Sessions & Authentication](https://github.com/Evolutionary-Algorithms-On-Click/auth_microservice) +- Jupyter Kernel Lifecycle (via `jupyter_gateway`) +- Database Interactions (CockroachDB) +- Inter-service communication (gRPC, HTTP) +- [LLM microservice ](https://github.com/Evolutionary-Algorithms-On-Click/evocv2_llm_microservice) +- [Volpe Integration service](https://github.com/Evolutionary-Algorithms-On-Click/volpe-integration) + +## Prerequisites + +- **Go**: 1.24.1 or higher +- **Docker**: For containerization and dependencies (CockroachDB, MinIO, ) +- **Docker Compose**: For orchestration +- **Make**: For running standard commands +- **Lefthook**: For git hooks (optional but recommended) + +## Getting Started + +### 1. Environment Setup + +1. Clone the repository. +2. Ensure you have the necessary environment variables set. Refer to `docker-compose.yaml` for required keys (e.g., `DATABASE_URL`, `JUPYTER_GATEWAY_URL`). + *Note: The project uses `godotenv` to load environment variables from a `.env` file in development.* + +### 2. Running Dependencies + +Start the supporting services (Database, Object Storage, Jupyter Gateway, etc.): + +```bash +make docker-up +``` + +or + +```bash +docker-compose up --build +``` + + +This will spin up CockroachDB, MinIO, Jupyter Gateway, and Python Runner as defined in `docker-compose.yaml`. + +### 3. Local Development + +To run the controller service locally: + +```bash +# Install tools and git hooks +make setup + +# Build and run the application +make run +``` + +The service will start on port `8080` (default). + +## API Documentation + +- **HTTP API**: Versioned under `/api/v1/`. Defined in `routes/api.go`. +- **gRPC**: Defined in `proto/authenticate.proto`. diff --git a/controllers/problem_controller.go b/controllers/problem_controller.go index 2545c45..c77d500 100644 --- a/controllers/problem_controller.go +++ b/controllers/problem_controller.go @@ -3,7 +3,9 @@ package controllers import ( "context" "encoding/json" + "errors" "fmt" + "io" "log" "net/http" "time" @@ -212,3 +214,94 @@ func (c *ProblemController) DeleteProblemByIDHandler(w http.ResponseWriter, r *h c.Logger.Info().Str("problemID", problemID).Msg("successfully deleted problem by ID") w.WriteHeader(http.StatusNoContent) } + +// SubmitNotebookHandler handles POST /api/v1/submission/submit +func (c *ProblemController) SubmitNotebookHandler(w http.ResponseWriter, r *http.Request) { + c.Logger.Info().Msg("received request to submit notebook") + + user, ok := r.Context().Value(middleware.UserContextKey).(*middleware.User) + if !ok { + c.Logger.Error().Msg("user not found in context for submission") + http.Error(w, "user not found in context", http.StatusUnauthorized) + return + } + + type submitRequest struct { + NotebookID string `json:"notebook_id"` + Filename string `json:"filename"` + SessionID string `json:"session_id"` + } + + var req submitRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + c.Logger.Error().Err(err).Msg("invalid request body for submission") + http.Error(w, "invalid request body", http.StatusBadRequest) + return + } + + if req.NotebookID == "" || req.Filename == "" || req.SessionID == "" { + http.Error(w, "notebook_id, filename, and session_id are required", http.StatusBadRequest) + return + } + + // We don't limit the context time here as submission might take time, or we rely on default timeouts + ctx := r.Context() + + result, err := c.ProblemModule.SubmitNotebook(ctx, user.ID, req.NotebookID, req.SessionID, req.Filename) + if err != nil { + c.Logger.Error().Err(err).Msg("failed to submit notebook") + http.Error(w, fmt.Sprintf("failed to submit notebook: %v", err), http.StatusInternalServerError) + return + } + + pkg.WriteJSONResponseWithLogger(w, http.StatusOK, result, &c.Logger) +} + +// GetSubmissionResultsHandler handles GET /api/v1/submission/results/{problemId} +func (c *ProblemController) GetSubmissionResultsHandler(w http.ResponseWriter, r *http.Request) { + problemID := r.PathValue("problemId") + c.Logger.Info().Str("problemID", problemID).Msg("received request to get submission results") + + ctx := r.Context() + + // Connect to volpe and get the stream + stream, err := c.ProblemModule.GetSubmissionResults(ctx, problemID) + if err != nil { + c.Logger.Error().Err(err).Msg("failed to get results stream") + http.Error(w, fmt.Sprintf("failed to get results: %v", err), http.StatusInternalServerError) + return + } + defer stream.Close() + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + buf := make([]byte, 1024) + for { + n, err := stream.Read(buf) + if n > 0 { + if _, wErr := w.Write(buf[:n]); wErr != nil { + c.Logger.Error().Err(wErr).Msg("failed to write to response") + break + } + flusher.Flush() + } + if err != nil { + if err != io.EOF { + if errors.Is(err, context.Canceled) { + c.Logger.Info().Msg("client disconnected from submission results stream") + } else { + c.Logger.Error().Err(err).Msg("error reading from volpe stream") + } + } + break + } + } +} diff --git a/docker-compose.yaml b/docker-compose.yaml index 807aabd..134c1b4 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -17,7 +17,8 @@ services: CULL_INTERVAL_MINUTES: 10 IDLE_THRESHOLD_MINUTES: 30 AUTH_GRPC_ADDRESS: "auth:5001" - LLM_MICROSERVICE_URL: "http://host.docker.internal:8000" + LLM_MICROSERVICE_URL: "http://host.docker.internal:5004" + VOLPE_SERVICE_URL: "http://host.docker.internal:7070" USER_DATA_DIR: "/mnt/user_data" networks: - evoc-net diff --git a/modules/problem_module.go b/modules/problem_module.go index 512b7c1..c9d9338 100644 --- a/modules/problem_module.go +++ b/modules/problem_module.go @@ -1,8 +1,16 @@ package modules import ( + "bytes" "context" + "encoding/json" "errors" + "fmt" + "io" + "mime/multipart" + "net/http" + "os" + "path/filepath" "time" "github.com/Thanus-Kumaar/controller_microservice_v2/db/repository" @@ -13,13 +21,25 @@ import ( // ProblemModule encapsulates the business logic for problem statements. type ProblemModule struct { - ProblemRepo repository.ProblemRepository - logger zerolog.Logger + ProblemRepo repository.ProblemRepository + NotebookRepo repository.NotebookRepository + FileModule *FileModule + logger zerolog.Logger } // NewProblemModule creates and returns a new ProblemModule. -func NewProblemModule(problemRepo repository.ProblemRepository, logger zerolog.Logger) *ProblemModule { - return &ProblemModule{ProblemRepo: problemRepo, logger: logger} +func NewProblemModule( + problemRepo repository.ProblemRepository, + notebookRepo repository.NotebookRepository, + fileModule *FileModule, + logger zerolog.Logger, +) *ProblemModule { + return &ProblemModule{ + ProblemRepo: problemRepo, + NotebookRepo: notebookRepo, + FileModule: fileModule, + logger: logger, + } } // WithLogger allows setting the logger for the module. @@ -195,3 +215,157 @@ func (m *ProblemModule) UpdateProblem(ctx context.Context, problemID string, req return updatedProblem, nil } +// SubmitNotebook submits a notebook and a file to the volpe service. +func (m *ProblemModule) SubmitNotebook(ctx context.Context, userID, notebookID, sessionID, filename string) (map[string]interface{}, error) { + m.logger.Info().Str("notebookID", notebookID).Str("userID", userID).Msg("submitting notebook") + + // Fetch Notebook + notebook, err := m.NotebookRepo.GetNotebookByID(ctx, notebookID, userID) + if err != nil { + m.logger.Error().Err(err).Msg("failed to fetch notebook") + return nil, fmt.Errorf("failed to fetch notebook: %w", err) + } + + // Prepare Request Data JSON + volpeCells := make([]map[string]interface{}, len(notebook.Cells)) + for i, cell := range notebook.Cells { + volpeCells[i] = map[string]interface{}{ + "cell_type": cell.CellType, + "cell_name": cell.CellName.String, // Use String value, empty if invalid + "source": cell.Source, + "execution_count": nil, // Prompt example uses null + "metadata": map[string]interface{}{ + "cell_index": cell.CellIndex, + }, + } + if cell.ExecutionCount != 0 { + volpeCells[i]["execution_count"] = cell.ExecutionCount + } + } + + requirements := "" + if notebook.Requirements.Valid { + requirements = notebook.Requirements.String + } + + requestData := map[string]interface{}{ + "user_id": userID, + "notebook_id": notebookID, + "notebook": map[string]interface{}{ + "cells": volpeCells, + "metadata": map[string]interface{}{}, + "requirements": requirements, + }, + "requirements": requirements, + } + + requestDataJSON, err := json.Marshal(requestData) + if err != nil { + return nil, fmt.Errorf("failed to marshal request data: %w", err) + } + + // Locate and Read File + // Check for directory traversal in filename + + if filename != filepath.Base(filename) { + return nil, errors.New("invalid filename") + } + filePath := filepath.Join(m.FileModule.BaseDir, sessionID, filename) + file, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("failed to open file: %w", err) + } + defer file.Close() + + // Create Multipart Request + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + + // Add request_data + if err := writer.WriteField("request_data", string(requestDataJSON)); err != nil { + return nil, err + } + + // Add metadata_json + metadataJSON := `{"memory": 1,"targetInstances":8}` + if err := writer.WriteField("metadata_json", metadataJSON); err != nil { + return nil, err + } + + // Add file + part, err := writer.CreateFormFile("file", filename) + if err != nil { + return nil, err + } + if _, err := io.Copy(part, file); err != nil { + return nil, err + } + + if err := writer.Close(); err != nil { + return nil, err + } + + // Send Request + volpeURL := os.Getenv("VOLPE_SERVICE_URL") + if volpeURL == "" { + volpeURL = "http://localhost:7070" + } + reqURL := fmt.Sprintf("%s/submit", volpeURL) + + req, err := http.NewRequestWithContext(ctx, "POST", reqURL, body) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", writer.FormDataContentType()) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send request to volpe: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + m.logger.Error().Int("status", resp.StatusCode).Str("body", string(respBody)).Msg("volpe returned error") + return nil, fmt.Errorf("volpe returned status: %d", resp.StatusCode) + } + + var result map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("failed to decode response: %w", err) + } + + return result, nil +} + +// GetSubmissionResults streams the SSE results from volpe. +func (m *ProblemModule) GetSubmissionResults(ctx context.Context, problemID string) (io.ReadCloser, error) { + volpeURL := os.Getenv("VOLPE_SERVICE_URL") + if volpeURL == "" { + volpeURL = "http://localhost:9000" + } + reqURL := fmt.Sprintf("%s/results/%s", volpeURL, problemID) + + req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil) + if err != nil { + return nil, fmt.Errorf("Error with Volpe service: %w", err) + } + + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Cache-Control", "no-cache") + req.Header.Set("Connection", "keep-alive") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to connect to volpe results: %w", err) + } + + if resp.StatusCode != http.StatusOK { + resp.Body.Close() + return nil, fmt.Errorf("volpe returned status: %d", resp.StatusCode) + } + + return resp.Body, nil +} diff --git a/pkg/middleware/log_middleware.go b/pkg/middleware/log_middleware.go index 9af85f0..6502d24 100644 --- a/pkg/middleware/log_middleware.go +++ b/pkg/middleware/log_middleware.go @@ -29,6 +29,12 @@ func (r *statusRecorder) Hijack() (net.Conn, *bufio.ReadWriter, error) { return h.Hijack() } +func (r *statusRecorder) Flush() { + if f, ok := r.ResponseWriter.(http.Flusher); ok { + f.Flush() + } +} + func RequestLogger(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/routes/api.go b/routes/api.go index 0a51ef3..62b0bd6 100644 --- a/routes/api.go +++ b/routes/api.go @@ -22,19 +22,19 @@ func RegisterAPIRoutes(mux *http.ServeMux, c *jupyterclient.Client) { problemRepo := repository.NewProblemRepository(db.Pool).WithLogger(*pkg.Logger) cellRepo := repository.NewCellRepository(db.Pool, *pkg.Logger) - // Initialize Modules - notebookModule := modules.NewNotebookModule(notebookRepo, problemRepo) - llmModule := modules.NewLlmModule(llmRepo) - sessionModule := modules.NewSessionModule(sessionRepo, c, *pkg.Logger, notebookRepo) - problemModule := modules.NewProblemModule(problemRepo, *pkg.Logger) // Pass the logger here - cellModule := modules.NewCellModule(cellRepo, *pkg.Logger) - userDataDir := os.Getenv("USER_DATA_DIR") if userDataDir == "" { userDataDir = "/mnt/user_data" } fileModule := modules.NewFileModule(userDataDir) + // Initialize Modules + notebookModule := modules.NewNotebookModule(notebookRepo, problemRepo) + llmModule := modules.NewLlmModule(llmRepo) + sessionModule := modules.NewSessionModule(sessionRepo, c, *pkg.Logger, notebookRepo) + problemModule := modules.NewProblemModule(problemRepo, notebookRepo, fileModule, *pkg.Logger) // Pass the logger here + cellModule := modules.NewCellModule(cellRepo, *pkg.Logger) + // Initialize Controllers notebookController := controllers.NewNotebookController(notebookModule, pkg.Logger) sessionController := controllers.NewSessionController(sessionModule, *pkg.Logger) @@ -58,6 +58,12 @@ func RegisterAPIRoutes(mux *http.ServeMux, c *jupyterclient.Client) { mux.Handle("DELETE /api/v1/problems/{id}", middleware.AuthMiddleware(http.HandlerFunc(problemController.DeleteProblemByIDHandler))) + // VolPE Routes + mux.Handle("POST /api/v1/submission/submit", + middleware.AuthMiddleware(http.HandlerFunc(problemController.SubmitNotebookHandler))) + mux.Handle("GET /api/v1/submission/results/{problemId}", + middleware.AuthMiddleware(http.HandlerFunc(problemController.GetSubmissionResultsHandler))) + // Notebook Routes mux.Handle("POST /api/v1/notebooks", middleware.AuthMiddleware(http.HandlerFunc(notebookController.CreateNotebookHandler)))