diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index ed351c2..94b8ad0 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -29,6 +29,9 @@ jobs: echo PLUGINS_DIR='/.data/plugins' >> .env echo TMP_JOB_LOGS_DIR='/.data/tmp/logs' >> .env + echo MAX_LOCAL_CPUS=4 >> .env + echo MAX_LOCAL_MEMORY=4096 >> .env + echo MINIO_ACCESS_KEY_ID=user >> .env echo MINIO_SECRET_ACCESS_KEY=password >> .env diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a96d0a..b31c8b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,22 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Unreleased +### API +#### POST /processes/{processID}/execution +- Execution mode now determined per OGC API - Processes Requirements 25/26: honors `Prefer: respond-async` header when process supports both modes, defaults to sync otherwise +- Returns `Preference-Applied` response header when async preference is honored + +#### GET /admin/resources +- New endpoint to view resource utilization for local jobs (docker, subprocess) and queue status + +### Configuration +- New `MAX_LOCAL_CPUS` and `MAX_LOCAL_MEMORY` environment variables (or `--max-local-cpus` and `--max-local-memory` CLI flags) to set resource limits for local job scheduling +- Process definitions are validated against these limits at startup and when adding/updating processes via API +- Processes without explicit resource requirements use default values + +### Documentation +- Added sequence diagram for local scheduler + ## [0.2.1] - 2025-12-03 ### API diff --git a/DEV_GUIDE.md b/DEV_GUIDE.md index c246358..7f4bbd8 100644 --- a/DEV_GUIDE.md +++ b/DEV_GUIDE.md @@ -15,10 +15,10 @@ ## Auth - If auth is enabled some or all routes are protected based on env variable `AUTH_LEVEL` settings. -- The middleware validate and parse JWT to verify `X-ProcessAPI-User-Email` header and inject `X-ProcessAPI-User-Roles` header. +- The middleware validate and parse JWT to verify `X-SEPEX-User-Email` header and inject `X-SEPEX-User-Roles` header. - A user can use tools like Postman to set these headers themselves, but if auth is enabled, they will be checked against the token. This setup allows adding submitter info to the database when auth is not enabled. -- If auth is enabled `X-ProcessAPI-User-Email` header is mandatory. -- Requests from Service Role will not be verified for `X-ProcessAPI-User-Email`. +- If auth is enabled `X-SEPEX-User-Email` header is mandatory. +- Requests from Service Role will not be verified for `X-SEPEX-User-Email`. - Only service_accounts can post callbacks - Requests from Admin Role are allowed to execute all processes, non-admins must have the role with same name as `processID` to execute that process. - Requests from Admin Role are allowed to retrieve all jobs information, non admins can only retrieve information for jobs that they submitted. @@ -30,6 +30,18 @@ ## Scope - The behavior of logging is unknown for AWS Batch processes with job definitions having number of attempts more than 1. +## Local-Scheduler + + +## Local Scheduler + +**Design decisions:** + +1. ResourceLimits calculated once at startup from flags/env vars. Dynamic reconfiguration rejected because a queued job could block forever if limits are reduced below its requirements after it was already validated and enqueued. + +1. ResourcePool and PendingJobs use `sync.Mutex`. Channels add complexity without benefit for simple state. Go channels use internal mutexes anyway, so performance is similar. + + ## Release/Versioning/Changelog The project uses an automated release workflow triggered by semver tags (e.g., `v1.0.0`, `v1.0.0-beta`). The workflow validates prerequisites, runs security scans, builds multi-platform container images, and creates GitHub releases with auto-generated release notes. diff --git a/README.md b/README.md index 7e3a5a0..8505120 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,8 @@ Processes are computational tasks described through a configuration file that ca ### Jobs  -Each execution of a process is called a job. A job can be synchronous or asynchronous depending on which host it is being executed upon. Synchronous jobs return responses after the job has reached a finished state, meaning either successful or failed. The asynchronous jobs return a response immediately with a job id for the client so that the client can monitor the jobs. +Each execution of a process is called a job. A job can be synchronous or asynchronous depending on the process configuration and client preference. Synchronous jobs return responses after the job has reached a finished state, meaning either successful or failed. The asynchronous jobs return a response immediately with a job id for the client so that the client can monitor the jobs. + *Note on Processes: The developers must make sure they choose the right platform to execute a process. The processes that are short-lived and fast and do not create a file resource as an output, for example getting the water surface elevation values for a coordinate from cloud raster, must be registered to run on the local machine so that they are synchronous. These kinds of processes should output data in JSON format.* diff --git a/api/auth/keycloak.go b/api/auth/keycloak.go index 78f9d73..3bc67da 100644 --- a/api/auth/keycloak.go +++ b/api/auth/keycloak.go @@ -132,14 +132,14 @@ func (kas *KeycloakAuthStrategy) ValidateToken(tokenString string) (*Claims, err return &claims, nil } -// Validate X-ProcessAPI-User-Email header against user from claims +// Validate X-SEPEX-User-Email header against user from claims func (kas *KeycloakAuthStrategy) ValidateUser(c echo.Context, claims *Claims) (err error) { roles := claims.RealmAccess["roles"] if kas.ServiceRoleName != "" && overlap(roles, []string{kas.ServiceRoleName}) { // assume provided header is correct - } else if claims.Email == "" || !(c.Request().Header.Get("X-ProcessAPI-User-Email") == claims.Email) { - return fmt.Errorf("invalid X-ProcessAPI-User-Email header") + } else if claims.Email == "" || !(c.Request().Header.Get("X-SEPEX-User-Email") == claims.Email) { + return fmt.Errorf("invalid X-SEPEX-User-Email header") } return nil @@ -150,7 +150,7 @@ func (kas *KeycloakAuthStrategy) SetUserRolesHeader(c echo.Context, claims *Clai roles, exists := claims.RealmAccess["roles"] if exists { rolesString := strings.Join(roles, ",") - c.Request().Header.Set("X-ProcessAPI-User-Roles", rolesString) + c.Request().Header.Set("X-SEPEX-User-Roles", rolesString) } else { return nil } diff --git a/api/handlers/config.go b/api/handlers/config.go index a14a609..e87879e 100644 --- a/api/handlers/config.go +++ b/api/handlers/config.go @@ -8,6 +8,8 @@ import ( "fmt" "io" "os" + "runtime" + "strconv" "strings" "text/template" @@ -29,6 +31,14 @@ func (t Template) Render(w io.Writer, name string, data interface{}, c echo.Cont return t.templates.ExecuteTemplate(w, name, data) } +// ResourceLimits holds the maximum resource limits for job scheduling. +// This is read once at startup and shared across the application to ensure +// consistent validation between process registration and job execution. +type ResourceLimits struct { + MaxCPUs float32 + MaxMemory int // in MB +} + // Config holds the configuration settings for the REST API server. type Config struct { // Only settings that are typically environment-specific and can be loaded from @@ -39,6 +49,9 @@ type Config struct { AuthLevel int AdminRoleName string ServiceRoleName string + + // Resource limits for local job scheduling (docker/subprocess) + ResourceLimits *ResourceLimits } // RESTHandler encapsulates the operational components and dependencies necessary for handling @@ -56,6 +69,9 @@ type RESTHandler struct { DB jobs.Database MessageQueue *jobs.MessageQueue ActiveJobs *jobs.ActiveJobs + PendingJobs *jobs.PendingJobs + ResourcePool *jobs.ResourcePool + QueueWorker *jobs.QueueWorker ProcessList *pr.ProcessList Config *Config } @@ -71,7 +87,7 @@ func prettyPrint(v interface{}) string { // Initializes resources and return a new handler // errors are fatal -func NewRESTHander(gitTag string) *RESTHandler { +func NewRESTHander(gitTag string, maxLocalCPUs string, maxLocalMemory string) *RESTHandler { apiName, exist := os.LookupEnv("API_NAME") if !exist { log.Warn("env variable API_NAME not set") @@ -82,6 +98,9 @@ func NewRESTHander(gitTag string) *RESTHandler { log.Warn("env variable REPO_URL not set") } + // Calculate resource limits once at startup + resourceLimits := newResourceLimits(maxLocalCPUs, maxLocalMemory) + // working with pointers here so as not to copy large templates, yamls, and ActiveJobs config := RESTHandler{ Name: apiName, @@ -101,6 +120,7 @@ func NewRESTHander(gitTag string) *RESTHandler { Config: &Config{ AdminRoleName: os.Getenv("AUTH_ADMIN_ROLE"), ServiceRoleName: os.Getenv("AUTH_SERVICE_ROLE"), + ResourceLimits: resourceLimits, }, } @@ -159,6 +179,15 @@ func NewRESTHander(gitTag string) *RESTHandler { ac.Jobs = make(map[string]*jobs.Job) config.ActiveJobs = &ac + // Setup Pending Jobs queue for async jobs waiting for resources + config.PendingJobs = jobs.NewPendingJobs() + + // Setup Resource Pool for tracking CPU/memory availability + config.ResourcePool = jobs.NewResourcePool(resourceLimits.MaxCPUs, resourceLimits.MaxMemory) + + // Setup Queue Worker to process pending jobs + config.QueueWorker = jobs.NewQueueWorker(config.PendingJobs, config.ResourcePool) + config.MessageQueue = &jobs.MessageQueue{ StatusChan: make(chan jobs.StatusMessage, 500), JobDone: make(chan jobs.Job, 1), @@ -166,7 +195,7 @@ func NewRESTHander(gitTag string) *RESTHandler { // Create local logs directory if not exist pluginsDir := os.Getenv("PLUGINS_DIR") // We already know this env variable exist because it is being checked in plguinsInit function - processList, err := pr.LoadProcesses(pluginsDir) + processList, err := pr.LoadProcesses(pluginsDir, resourceLimits.MaxCPUs, resourceLimits.MaxMemory) if err != nil { log.Fatal(err) } @@ -233,3 +262,37 @@ func NewStorageService(providerType string) (*s3.S3, error) { return nil, fmt.Errorf("unsupported storage provider type") } } + +// newResourceLimits creates ResourceLimits from the provided values. +// Values come from CLI flags which already have env var fallback via resolveValue(). +// Falls back to 80% of system CPUs and 8GB memory if not specified. +func newResourceLimits(maxLocalCPUsStr string, maxLocalMemoryStr string) *ResourceLimits { + numCPUs := float32(runtime.NumCPU()) + + // Default to 80% of system CPUs + maxCPUs := numCPUs * 0.8 + if maxLocalCPUsStr != "" { + if parsed, err := strconv.ParseFloat(maxLocalCPUsStr, 32); err == nil { + maxCPUs = float32(parsed) + } else { + log.Warnf("Invalid MAX_LOCAL_CPUS value: %s, using default %.2f", maxLocalCPUsStr, maxCPUs) + } + } + + // Default to 8GB + maxMemory := 8192 + if maxLocalMemoryStr != "" { + if parsed, err := strconv.Atoi(maxLocalMemoryStr); err == nil { + maxMemory = parsed + } else { + log.Warnf("Invalid MAX_LOCAL_MEMORY_MB value: %s, using default %d", maxLocalMemoryStr, maxMemory) + } + } + + log.Infof("ResourceLimits initialized: maxCPUs=%.2f, maxMemory=%dMB", maxCPUs, maxMemory) + + return &ResourceLimits{ + MaxCPUs: maxCPUs, + MaxMemory: maxMemory, + } +} diff --git a/api/handlers/execution_mode.go b/api/handlers/execution_mode.go new file mode 100644 index 0000000..8b92583 --- /dev/null +++ b/api/handlers/execution_mode.go @@ -0,0 +1,83 @@ +package handlers + +import ( + "app/utils" + "strings" +) + +// ExecutionModeResult contains the determined execution mode and whether +// a Prefer header preference was honored (for Preference-Applied response header) +type ExecutionModeResult struct { + Mode string // "sync-execute" or "async-execute" + PreferenceApplied string // Value for Preference-Applied header, empty if none +} + +// DetermineExecutionMode implements OGC API - Processes execution mode logic +// per Requirements 25 and 26, and Recommendation 12A. +// +// Decision matrix: +// - No Prefer header + async-only process → async (Req 25A) +// - No Prefer header + sync-only process → sync (Req 25B) +// - No Prefer header + both modes → sync (Req 25C - default) +// - Prefer: respond-async + async-only → async (Req 26A) +// - Prefer: respond-async + sync-only → sync (Req 26B - ignore preference) +// - Prefer: respond-async + both modes → async (Req 26C + Rec 12A - honor preference) +func DetermineExecutionMode(jobControlOptions []string, preferHeader string) ExecutionModeResult { + supportsSync := utils.StringInSlice("sync-execute", jobControlOptions) + supportsAsync := utils.StringInSlice("async-execute", jobControlOptions) + wantsAsync := parseRespondAsyncPreference(preferHeader) + + result := ExecutionModeResult{} + + // Case 1: Process only supports async (Req 25A, 26A) + if supportsAsync && !supportsSync { + result.Mode = "async-execute" + return result + } + + // Case 2: Process only supports sync (Req 25B, 26B) + if supportsSync && !supportsAsync { + result.Mode = "sync-execute" + // If client requested async but we can only do sync, no preference was applied + return result + } + + // Case 3: Process supports both modes + if wantsAsync { + // Req 26C + Rec 12A: Honor the async preference + result.Mode = "async-execute" + result.PreferenceApplied = "respond-async" + return result + } + + // Req 25C: Default to sync when no preference given + result.Mode = "sync-execute" + return result +} + +// parseRespondAsyncPreference checks if the Prefer header contains "respond-async" +// The Prefer header can contain multiple comma or space separated preferences. +// Example: "respond-async, wait=10" or "respond-async" +func parseRespondAsyncPreference(preferHeader string) bool { + if preferHeader == "" { + return false + } + + // Prefer header values can be comma-separated + // Each preference can have parameters separated by semicolons + // We're looking for "respond-async" token + preferences := strings.Split(preferHeader, ",") + for _, pref := range preferences { + // Trim whitespace and get the preference name (before any parameters) + pref = strings.TrimSpace(pref) + // Handle parameters like "respond-async; wait=10" + parts := strings.SplitN(pref, ";", 2) + prefName := strings.TrimSpace(parts[0]) + + if prefName == "respond-async" { + return true + } + } + + return false +} diff --git a/api/handlers/handlers.go b/api/handlers/handlers.go index 551cfd7..a02898e 100644 --- a/api/handlers/handlers.go +++ b/api/handlers/handlers.go @@ -173,7 +173,7 @@ func (rh *RESTHandler) Execution(c echo.Context) error { } if rh.Config.AuthLevel > 0 { - roles := strings.Split(c.Request().Header.Get("X-ProcessAPI-User-Roles"), ",") + roles := strings.Split(c.Request().Header.Get("X-SEPEX-User-Roles"), ",") // admins are allowed to execute all processes, else you need to have a role with same name as processId if !utils.StringInSlice(rh.Config.AdminRoleName, roles) && !utils.StringInSlice(processID, roles) { @@ -211,7 +211,11 @@ func (rh *RESTHandler) Execution(c echo.Context) error { cmd = append(cmd, string(jsonParams)) } - mode := p.Info.JobControlOptions[0] + // Determine execution mode based on process capabilities and client preference + // per OGC API - Processes Requirements 25, 26 and Recommendation 12A + preferHeader := c.Request().Header.Get("Prefer") + modeResult := DetermineExecutionMode(p.Info.JobControlOptions, preferHeader) + mode := modeResult.Mode host := p.Host.Type // ----------- Process related setup is complete at this point --------- @@ -225,7 +229,7 @@ func (rh *RESTHandler) Execution(c echo.Context) error { // params.Inputs["resultsCallbackUri"] = fmt.Sprintf("%s/jobs/%s/results_update", os.Getenv("API_URL_PUBLIC"), jobID) // } - submitter := c.Request().Header.Get("X-ProcessAPI-User-Email") + submitter := c.Request().Header.Get("X-SEPEX-User-Email") var j jobs.Job switch host { case "docker": @@ -242,6 +246,8 @@ func (rh *RESTHandler) Execution(c echo.Context) error { StorageSvc: rh.StorageSvc, DB: rh.DB, DoneChan: rh.MessageQueue.JobDone, + ResourcePool: rh.ResourcePool, + IsSync: mode == "sync-execute", } case "aws-batch": @@ -269,24 +275,40 @@ func (rh *RESTHandler) Execution(c echo.Context) error { EnvVars: p.Config.EnvVars, Cmd: cmd, ProcessVersion: p.Info.Version, + Resources: jobs.Resources(p.Config.Resources), StorageSvc: rh.StorageSvc, DB: rh.DB, DoneChan: rh.MessageQueue.JobDone, + ResourcePool: rh.ResourcePool, + IsSync: mode == "sync-execute", } } - // Create job + // Create job (reserves resources for sync docker/subprocess jobs) err = j.Create() if err != nil { + if err.Error() == "resources unavailable" { + // Only sync jobs can fail with this error + return c.JSON(http.StatusServiceUnavailable, errResponse{ + Message: "Server resources are backlogged for local job execution. Use async-execute mode (if available for this process) or retry later.", + }) + } return c.JSON(http.StatusInternalServerError, errResponse{Message: fmt.Sprintf("submission error %s", err.Error())}) } // Add to active jobs rh.ActiveJobs.Add(&j) + // Add Preference-Applied header if a preference was honored (Rec 14) + if modeResult.PreferenceApplied != "" { + c.Response().Header().Set("Preference-Applied", modeResult.PreferenceApplied) + } + resp := jobResponse{ProcessID: j.ProcessID(), Type: "process", JobID: jobID, Status: j.CurrentStatus()} switch mode { case "sync-execute": + j.Run() + // wgRun.Add(1) is called in Create() so WaitForRunCompletion() blocks correctly j.WaitForRunCompletion() resp.Status = j.CurrentStatus() @@ -307,6 +329,16 @@ func (rh *RESTHandler) Execution(c echo.Context) error { return c.JSON(http.StatusInternalServerError, resp) } case "async-execute": + // Only queue Docker/Subprocess jobs that need local resources + // AWS Batch auto-starts in Create(), no queuing needed + switch j.(type) { + case *jobs.DockerJob, *jobs.SubprocessJob: + // Track queued resources, add to queue, and notify worker + res := j.GetResources() + rh.ResourcePool.AddQueued(res.CPUs, res.Memory) + rh.PendingJobs.Enqueue(&j) + rh.QueueWorker.NotifyNewJob() + } resp.Status = j.CurrentStatus() return c.JSON(http.StatusCreated, resp) default: @@ -324,25 +356,36 @@ func (rh *RESTHandler) Execution(c echo.Context) error { // @Router /jobs/{jobID} [delete] // Does not produce HTML func (rh *RESTHandler) JobDismissHandler(c echo.Context) error { - jobID := c.Param("jobID") - if j, ok := rh.ActiveJobs.Jobs[jobID]; ok { - if rh.Config.AuthLevel > 0 { - roles := strings.Split(c.Request().Header.Get("X-ProcessAPI-User-Roles"), ",") + // 1. Check if job exists in active jobs + j, ok := rh.ActiveJobs.Jobs[jobID] + if !ok { + return c.JSON(http.StatusNotFound, errResponse{Message: fmt.Sprintf("job %s not in the active jobs list", jobID)}) + } - if (*j).SUBMITTER() != c.Request().Header.Get("X-ProcessAPI-User-Email") && !utils.StringInSlice(rh.Config.AdminRoleName, roles) { - return c.JSON(http.StatusForbidden, errResponse{Message: "Forbidden"}) - } + // 2. Check auth + if rh.Config.AuthLevel > 0 { + roles := strings.Split(c.Request().Header.Get("X-SEPEX-User-Roles"), ",") + if (*j).SUBMITTER() != c.Request().Header.Get("X-SEPEX-User-Email") && !utils.StringInSlice(rh.Config.AdminRoleName, roles) { + return c.JSON(http.StatusForbidden, errResponse{Message: "Forbidden"}) } + } - err := (*j).Kill() - if err != nil { - return c.JSON(http.StatusBadRequest, errResponse{Message: err.Error()}) - } - return c.JSON(http.StatusOK, jobResponse{ProcessID: (*j).ProcessID(), Type: "process", JobID: jobID, Status: (*j).CurrentStatus(), Message: fmt.Sprintf("job %s dismissed", jobID)}) + // 3. Remove from pending queue if it exists there (job hasn't started yet) + removed := rh.PendingJobs.Remove(jobID) + if removed != nil { + // Job was in queue - update queued resource tracking + res := (*removed).GetResources() + rh.ResourcePool.RemoveQueued(res.CPUs, res.Memory) } - return c.JSON(http.StatusNotFound, errResponse{Message: fmt.Sprintf("job %s not in the active jobs list", jobID)}) + + // 4. Kill the job + err := (*j).Kill() + if err != nil { + return c.JSON(http.StatusBadRequest, errResponse{Message: err.Error()}) + } + return c.JSON(http.StatusOK, jobResponse{ProcessID: (*j).ProcessID(), Type: "process", JobID: jobID, Status: (*j).CurrentStatus(), Message: fmt.Sprintf("job %s dismissed", jobID)}) } // @Summary Job Status @@ -595,10 +638,10 @@ func (rh *RESTHandler) ListJobsHandler(c echo.Context) error { } if rh.Config.AuthLevel > 1 { // changed for hotfix, should be > 0 when clients are updated - roles := strings.Split(c.Request().Header.Get("X-ProcessAPI-User-Roles"), ",") + roles := strings.Split(c.Request().Header.Get("X-SEPEX-User-Roles"), ",") if !utils.StringInSlice(rh.Config.AdminRoleName, roles) { - submitters = c.Request().Header.Get("X-ProcessAPI-User-Email") + submitters = c.Request().Header.Get("X-SEPEX-User-Email") } } @@ -655,7 +698,7 @@ func (rh *RESTHandler) ListJobsHandler(c echo.Context) error { // Time must be in RFC3339(ISO) format func (rh *RESTHandler) JobStatusUpdateHandler(c echo.Context) error { if rh.Config.AuthLevel > 0 { - roles := strings.Split(c.Request().Header.Get("X-ProcessAPI-User-Roles"), ",") + roles := strings.Split(c.Request().Header.Get("X-SEPEX-User-Roles"), ",") // only service accounts or admins can post status updates if !utils.StringInSlice(rh.Config.ServiceRoleName, roles) && !utils.StringInSlice(rh.Config.AdminRoleName, roles) { @@ -725,3 +768,61 @@ func (rh *RESTHandler) JobStatusUpdateHandler(c echo.Context) error { // } // } + +// resourcesResponse provides resource utilization data for JSON API and HTML rendering +type resourcesResponse struct { + UsedCPUs float32 `json:"usedCPUs"` + UsedMemory int `json:"usedMemory"` + QueuedCPUs float32 `json:"queuedCPUs"` + QueuedMemory int `json:"queuedMemory"` + MaxCPUs float32 `json:"maxCPUs"` + MaxMemory int `json:"maxMemory"` + UsedCPUsPct float32 `json:"usedCPUsPct"` + QueuedCPUsPct float32 `json:"queuedCPUsPct"` + UsedMemPct float32 `json:"usedMemPct"` + QueuedMemPct float32 `json:"queuedMemPct"` +} + +// @Summary Resource Status +// @Description Returns current resource utilization for local job scheduling +// @Tags admin +// @Accept */* +// @Produce json +// @Success 200 {object} resourcesResponse +// @Router /admin/resources [get] +func (rh *RESTHandler) ResourceStatusHandler(c echo.Context) error { + err := validateFormat(c) + if err != nil { + return err + } + + status := rh.ResourcePool.GetStatus() + + resources := resourcesResponse{ + UsedCPUs: status.UsedCPUs, + UsedMemory: status.UsedMemory, + QueuedCPUs: status.QueuedCPUs, + QueuedMemory: status.QueuedMemory, + MaxCPUs: status.MaxCPUs, + MaxMemory: status.MaxMemory, + } + + if status.MaxCPUs > 0 { + resources.UsedCPUsPct = (status.UsedCPUs / status.MaxCPUs) * 100 + resources.QueuedCPUsPct = (status.QueuedCPUs / status.MaxCPUs) * 100 + } + if status.MaxMemory > 0 { + resources.UsedMemPct = (float32(status.UsedMemory) / float32(status.MaxMemory)) * 100 + resources.QueuedMemPct = (float32(status.QueuedMemory) / float32(status.MaxMemory)) * 100 + } + + links := []link{ + {Href: "/admin/resources", Rel: "self", Title: "this document"}, + } + + output := make(map[string]interface{}) + output["resources"] = resources + output["links"] = links + + return prepareResponse(c, http.StatusOK, "resourceStatus", output) +} diff --git a/api/handlers/processes_handlers.go b/api/handlers/processes_handlers.go index ab34c0c..e57e7e1 100644 --- a/api/handlers/processes_handlers.go +++ b/api/handlers/processes_handlers.go @@ -113,7 +113,7 @@ func (rh *RESTHandler) ProcessDescribeHandler(c echo.Context) error { func (rh *RESTHandler) AddProcessHandler(c echo.Context) error { if rh.Config.AuthLevel > 0 { - roles := strings.Split(c.Request().Header.Get("X-ProcessAPI-User-Roles"), ",") + roles := strings.Split(c.Request().Header.Get("X-SEPEX-User-Roles"), ",") // non-admins are not allowed if !utils.StringInSlice(rh.Config.AdminRoleName, roles) { @@ -138,7 +138,7 @@ func (rh *RESTHandler) AddProcessHandler(c echo.Context) error { return prepareResponse(c, http.StatusBadRequest, "error", errResponse{Message: "Process ID mismatch", HTTPStatus: http.StatusBadRequest}) } - err = newProcess.Validate() + err = newProcess.Validate(rh.Config.ResourceLimits.MaxCPUs, rh.Config.ResourceLimits.MaxMemory) if err != nil { return c.JSON(http.StatusBadRequest, errResponse{Message: err.Error()}) } @@ -175,7 +175,7 @@ func (rh *RESTHandler) AddProcessHandler(c echo.Context) error { func (rh *RESTHandler) UpdateProcessHandler(c echo.Context) error { if rh.Config.AuthLevel > 0 { - roles := strings.Split(c.Request().Header.Get("X-ProcessAPI-User-Roles"), ",") + roles := strings.Split(c.Request().Header.Get("X-SEPEX-User-Roles"), ",") // non-admins are not allowed if !utils.StringInSlice(rh.Config.AdminRoleName, roles) { @@ -200,7 +200,7 @@ func (rh *RESTHandler) UpdateProcessHandler(c echo.Context) error { return c.JSON(http.StatusBadRequest, errResponse{Message: "Process ID mismatch"}) } - err = updatedProcess.Validate() + err = updatedProcess.Validate(rh.Config.ResourceLimits.MaxCPUs, rh.Config.ResourceLimits.MaxMemory) if err != nil { return c.JSON(http.StatusBadRequest, errResponse{Message: err.Error()}) } @@ -246,7 +246,7 @@ func (rh *RESTHandler) UpdateProcessHandler(c echo.Context) error { func (rh *RESTHandler) DeleteProcessHandler(c echo.Context) error { if rh.Config.AuthLevel > 0 { - roles := strings.Split(c.Request().Header.Get("X-ProcessAPI-User-Roles"), ",") + roles := strings.Split(c.Request().Header.Get("X-SEPEX-User-Roles"), ",") // non-admins are not allowed if !utils.StringInSlice(rh.Config.AdminRoleName, roles) { diff --git a/api/jobs/aws_batch_jobs.go b/api/jobs/aws_batch_jobs.go index d2b40dc..909abc2 100644 --- a/api/jobs/aws_batch_jobs.go +++ b/api/jobs/aws_batch_jobs.go @@ -56,6 +56,7 @@ type AWSBatchJob struct { DB Database StorageSvc *s3.S3 DoneChan chan Job + Resources // AWS Batch manages its own resources, but field needed for interface } func (j *AWSBatchJob) WaitForRunCompletion() { @@ -86,6 +87,23 @@ func (j *AWSBatchJob) IMAGE() string { return j.Image } +// Not used anywhere but needed for interface. +func (j *AWSBatchJob) GetResources() Resources { + return j.Resources +} + +// Run is a no-op for AWS Batch jobs since they auto-start in Create() +func (j *AWSBatchJob) Run() { + // AWS Batch jobs are submitted and start running automatically via the batch service + // No additional action needed here +} + +// IsSyncJob returns false for AWS Batch jobs. +// AWS Batch manages its own resources, so from local resource pool perspective, they're always async. +func (j *AWSBatchJob) IsSyncJob() bool { + return false +} + // Update container logs // Fetches Container logs from CloudWatch. func (j *AWSBatchJob) UpdateProcessLogs() (err error) { diff --git a/api/jobs/docker_jobs.go b/api/jobs/docker_jobs.go index 88763be..c86a205 100644 --- a/api/jobs/docker_jobs.go +++ b/api/jobs/docker_jobs.go @@ -23,6 +23,8 @@ type DockerJob struct { wg sync.WaitGroup // Used for monitoring running complete for sync jobs wgRun sync.WaitGroup + // closeOnce ensures Close() body executes exactly once + closeOnce sync.Once UUID string `json:"jobID"` ContainerID string @@ -40,9 +42,11 @@ type DockerJob struct { logFile *os.File Resources - DB Database - StorageSvc *s3.S3 - DoneChan chan Job + DB Database + StorageSvc *s3.S3 + DoneChan chan Job + ResourcePool *ResourcePool + IsSync bool } func (j *DockerJob) WaitForRunCompletion() { @@ -73,6 +77,10 @@ func (j *DockerJob) IMAGE() string { return j.Image } +func (j *DockerJob) GetResources() Resources { + return j.Resources +} + // Update container logs func (j *DockerJob) UpdateProcessLogs() (err error) { // If old status is one of the terminated status, close has already been called and container logs fetched, container killed @@ -202,6 +210,21 @@ func (j *DockerJob) initLogger() error { } func (j *DockerJob) Create() error { + // Only reserve resources for sync jobs at creation time + // Async jobs will have resources reserved when QueueWorker starts them + if j.IsSync { + if !j.ResourcePool.TryReserve(j.Resources.CPUs, j.Resources.Memory) { + return fmt.Errorf("resources unavailable") + } + } + + // Track if creation succeeded to handle cleanup on error + success := false + defer func() { + if !success && j.IsSync { + j.ResourcePool.Release(j.Resources.CPUs, j.Resources.Memory) + } + }() err := j.initLogger() if err != nil { @@ -221,31 +244,35 @@ func (j *DockerJob) Create() error { } j.NewStatusUpdate(ACCEPTED, time.Time{}) + + // Increment wgRun here so WaitForRunCompletion() blocks + // even if QueueWorker hasn't called StartRun() yet j.wgRun.Add(1) - go j.Run() + + success = true return nil } -func (j *DockerJob) Run() { - - // Helper function to check if context is cancelled. - isCancelled := func() bool { - select { - case <-j.ctx.Done(): - j.logger.Info("Context cancelled.") - return true - default: - return false - } - } +func (j *DockerJob) IsSyncJob() bool { + return j.IsSync +} - // defers are executed in LIFO order - // swap the order of following if results are posted/written by the container, and run close as a coroutine - defer j.wgRun.Done() +func (j *DockerJob) Run() { + // Single consolidated defer for all cleanup operations. + // Order of operations: + // 1. Recover from panic (if any) and mark job as FAILED + // 2. Release resources - free CPU/memory for next job in queue + // 3. Close() - cleanup process, logs, remove from ActiveJobs + // (closeOnce guarantees this only executes once, even if Kill() also called Close()) + // 4. wgRun.Done() - unblock sync job waiters after results are available defer func() { - if !isCancelled() { - j.Close() + if r := recover(); r != nil { + j.logger.Errorf("Run() panicked: %v", r) + j.NewStatusUpdate(FAILED, time.Time{}) } + j.ResourcePool.Release(j.Resources.CPUs, j.Resources.Memory) + j.Close() + j.wgRun.Done() }() c, err := controllers.NewDockerController() @@ -293,8 +320,11 @@ func (j *DockerJob) Run() { j.ContainerID = containerID - if isCancelled() { + // Check if job was cancelled (Kill() was called) before waiting for container + select { + case <-j.ctx.Done(): return + default: } // wait for process to finish @@ -330,9 +360,12 @@ func (j *DockerJob) Kill() error { // If a dismiss status is updated the job is considered dismissed at this point // Close being graceful or not does not matter. - defer func() { - go j.Close() - }() + // Cancel context to signal Run() to exit early if still executing. + // Close() is safe to call from both here and Run()'s defer because + // closeOnce guarantees the cleanup body executes exactly once. + j.ctxCancel() + + go j.Close() return nil } @@ -423,60 +456,66 @@ func (j *DockerJob) RunFinished() { // Write final logs, cancelCtx func (j *DockerJob) Close() { - - j.logger.Info("Starting closing routine.") - // to do: add panic recover to remove job from active jobs even if following panics - j.ctxCancel() // Signal Run function to terminate if running - - if j.ContainerID != "" { // Container related cleanups if container exists - c, err := controllers.NewDockerController() - if err != nil { - j.logger.Errorf("Could not create controller. Error: %s", err.Error()) - } else { - containerLogs, err := c.ContainerLog(context.TODO(), j.ContainerID) - if err != nil { - j.logger.Errorf("Could not fetch container logs. Error: %s", err.Error()) - } - - file, err := os.Create(fmt.Sprintf("%s/%s.process.jsonl", os.Getenv("TMP_JOB_LOGS_DIR"), j.UUID)) + // closeOnce.Do() ensures this cleanup runs exactly once, even if Close() is called + // multiple times concurrently. This allows for easier development. + // + // How sync.Once works: + // - First caller: acquires internal lock, executes the function, marks done + // - Concurrent/subsequent callers: see done flag, return immediately without executing + j.closeOnce.Do(func() { + j.logger.Info("Starting closing routine.") + j.ctxCancel() // Signal Run function to terminate if running + + if j.ContainerID != "" { // Container related cleanups if container exists + c, err := controllers.NewDockerController() if err != nil { - j.logger.Errorf("Could not create process logs file. Error: %s", err.Error()) - return - } - - writer := bufio.NewWriter(file) - - for i, line := range containerLogs { - if i != len(containerLogs)-1 { - _, err = writer.WriteString(line + "\n") - } else { - _, err = writer.WriteString(line) + j.logger.Errorf("Could not create controller. Error: %s", err.Error()) + } else { + containerLogs, err := c.ContainerLog(context.TODO(), j.ContainerID) + if err != nil { + j.logger.Errorf("Could not fetch container logs. Error: %s", err.Error()) } + + file, err := os.Create(fmt.Sprintf("%s/%s.process.jsonl", os.Getenv("TMP_JOB_LOGS_DIR"), j.UUID)) if err != nil { - j.logger.Errorf("Could not write log %s to file.", line) + j.logger.Errorf("Could not create process logs file. Error: %s", err.Error()) + return } - } - writer.Flush() - file.Close() + writer := bufio.NewWriter(file) + + for i, line := range containerLogs { + if i != len(containerLogs)-1 { + _, err = writer.WriteString(line + "\n") + } else { + _, err = writer.WriteString(line) + } + if err != nil { + j.logger.Errorf("Could not write log %s to file.", line) + } + } - err = c.ContainerRemove(context.TODO(), j.ContainerID) - if err != nil { - j.logger.Errorf("Could not remove container. Error: %s", err.Error()) + writer.Flush() + file.Close() + + err = c.ContainerRemove(context.TODO(), j.ContainerID) + if err != nil { + j.logger.Errorf("Could not remove container. Error: %s", err.Error()) + } } } - } - j.DoneChan <- j // At this point job can be safely removed from active jobs - - go func() { - j.wg.Wait() // wait if other routines like metadata are running - j.logFile.Close() - UploadLogsToStorage(j.StorageSvc, j.UUID, j.ProcessName) - // It is expected that logs will be requested multiple times for a recently finished job - // so we are waiting for one hour to before deleting the local copy - // so that we can avoid repetitive request to storage service. - // If the server shutdown, these files would need to be manually deleted - time.Sleep(time.Hour) - DeleteLocalLogs(j.StorageSvc, j.UUID, j.ProcessName) - }() + j.DoneChan <- j // At this point job can be safely removed from active jobs + + go func() { + j.wg.Wait() // wait if other routines like metadata are running + j.logFile.Close() + UploadLogsToStorage(j.StorageSvc, j.UUID, j.ProcessName) + // It is expected that logs will be requested multiple times for a recently finished job + // so we are waiting for one hour to before deleting the local copy + // so that we can avoid repetitive request to storage service. + // If the server shutdown, these files would need to be manually deleted + time.Sleep(time.Hour) + DeleteLocalLogs(j.StorageSvc, j.UUID, j.ProcessName) + }() + }) } diff --git a/api/jobs/jobs.go b/api/jobs/jobs.go index b219958..f8c223b 100644 --- a/api/jobs/jobs.go +++ b/api/jobs/jobs.go @@ -65,6 +65,16 @@ type Job interface { // Pefrom any cleanup such as cancelling context etc // It is the responsibility of whoever is updating the terminated status to also call Close() Close() + + // GetResources returns the CPU and memory resources for this job + GetResources() Resources + + // Run executes the job. Called by QueueWorker in a goroutine for Pending Jobs. + // Called by handler for sync jobs + Run() + + // IsSyncJob returns true if this is a synchronous job + IsSyncJob() bool } // JobRecord contains details about a job diff --git a/api/jobs/pending_jobs.go b/api/jobs/pending_jobs.go new file mode 100644 index 0000000..0e2c78f --- /dev/null +++ b/api/jobs/pending_jobs.go @@ -0,0 +1,87 @@ +package jobs + +import ( + "container/list" + "sync" +) + +// PendingJobs is a pure FIFO queue for jobs waiting to be executed. +// Only async Docker/Subprocess jobs that need local resource management go here. +// +// This is a pure data structure with no business logic - it just stores and +// retrieves jobs. Signaling and resource tracking are handled by QueueWorker +// and ResourcePool respectively. +// +// Uses a doubly-linked list + map for O(1) operations: +// - list.List: maintains FIFO order, O(1) insert/remove at ends +// - index map: jobID → list element pointer, O(1) lookup for Remove() +// +// Example: +// +// list: job1 ◄──► job2 ◄──► job3 +// ▲ +// index: {"uuid-2" → pointer} +// +// Remove("uuid-2"): +// 1. Map lookup: O(1) to find element +// 2. List remove: O(1) to update prev/next pointers +// Result: job1 ◄──► job3 +type PendingJobs struct { + list *list.List + index map[string]*list.Element + mu sync.Mutex +} + +// NewPendingJobs creates a new PendingJobs queue. +func NewPendingJobs() *PendingJobs { + return &PendingJobs{ + list: list.New(), + index: make(map[string]*list.Element), + } +} + +// Enqueue adds a job to the back of the queue. +func (pj *PendingJobs) Enqueue(j *Job) { + pj.mu.Lock() + defer pj.mu.Unlock() + + elem := pj.list.PushBack(j) + pj.index[(*j).JobID()] = elem +} + +// Peek returns the job at the front of the queue without removing it. +// Returns nil if the queue is empty. +func (pj *PendingJobs) Peek() *Job { + pj.mu.Lock() + defer pj.mu.Unlock() + + elem := pj.list.Front() + if elem == nil { + return nil + } + + return elem.Value.(*Job) +} + +// Remove removes a job by ID from anywhere in the queue. +// Returns the removed job, or nil if not found. +// O(1) lookup via map, O(1) removal from doubly-linked list. +func (pj *PendingJobs) Remove(jobID string) *Job { + pj.mu.Lock() + defer pj.mu.Unlock() + + elem, ok := pj.index[jobID] + if !ok { + return nil + } + + delete(pj.index, jobID) + return pj.list.Remove(elem).(*Job) +} + +// Len returns the number of jobs in the queue. +func (pj *PendingJobs) Len() int { + pj.mu.Lock() + defer pj.mu.Unlock() + return pj.list.Len() +} diff --git a/api/jobs/queue_worker.go b/api/jobs/queue_worker.go new file mode 100644 index 0000000..593ab08 --- /dev/null +++ b/api/jobs/queue_worker.go @@ -0,0 +1,105 @@ +package jobs + +import ( + "sync" + + log "github.com/sirupsen/logrus" +) + +// QueueWorker is the scheduler that starts pending jobs when resources are available. +// +// Responsibilities: +// - Listens for signals indicating work may be available +// - Attempts to start jobs from PendingJobs queue +// - Coordinates with ResourcePool for resource reservation +// - Moves resources from "queued" to "used" when jobs start +// +// Event-driven: wakes on new job signal or resource release signal. +type QueueWorker struct { + pendingJobs *PendingJobs + resourcePool *ResourcePool + workSignal chan struct{} // Signals that new work may be available + shutdown chan struct{} + wg sync.WaitGroup +} + +// NewQueueWorker creates a new QueueWorker. +func NewQueueWorker(pendingJobs *PendingJobs, resourcePool *ResourcePool) *QueueWorker { + return &QueueWorker{ + pendingJobs: pendingJobs, + resourcePool: resourcePool, + workSignal: make(chan struct{}, 1), + shutdown: make(chan struct{}), + } +} + +// Start begins the queue processing goroutine. +func (qw *QueueWorker) Start() { + qw.wg.Add(1) + go qw.processLoop() + log.Info("QueueWorker started") +} + +// Stop signals the queue worker to shutdown and waits for it to finish. +func (qw *QueueWorker) Stop() { + close(qw.shutdown) + qw.wg.Wait() + log.Info("QueueWorker stopped") +} + +// NotifyNewJob signals that a new job has been enqueued. +// Called by Handler after adding a job to PendingJobs. +func (qw *QueueWorker) NotifyNewJob() { + select { + case qw.workSignal <- struct{}{}: + default: + // Channel already has a pending signal; worker will process all jobs when it wakes + } +} + +// processLoop waits for signals and processes pending jobs. +func (qw *QueueWorker) processLoop() { + defer qw.wg.Done() + + for { + select { + case <-qw.shutdown: + log.Info("QueueWorker shutting down") + return + case <-qw.workSignal: + qw.tryStartJobs() + case <-qw.resourcePool.ReleaseChan(): + qw.tryStartJobs() + } + } +} + +// tryStartJobs processes pending jobs until queue is empty or resources unavailable. +func (qw *QueueWorker) tryStartJobs() { + for { + job := qw.pendingJobs.Peek() + if job == nil { + return + } + + res := (*job).GetResources() + if !qw.resourcePool.TryReserve(res.CPUs, res.Memory) { + return // Not enough resources, wait for release + } + + // Remove the same job we peeked; it may have been dismissed concurrently, so can't use dequeue directly. + removed := qw.pendingJobs.Remove((*job).JobID()) + if removed == nil { + // Job disappeared between peek and remove; release reservation and retry. + qw.resourcePool.Release(res.CPUs, res.Memory) + continue + } + + // Job is leaving the queue and starting - update resource tracking. + // Resources removed from "queued" (TryReserve already added to "used"). + qw.resourcePool.RemoveQueued(res.CPUs, res.Memory) + + log.Infof("Starting job %s", (*removed).JobID()) + go (*removed).Run() + } +} diff --git a/api/jobs/resource_pool.go b/api/jobs/resource_pool.go new file mode 100644 index 0000000..efea332 --- /dev/null +++ b/api/jobs/resource_pool.go @@ -0,0 +1,142 @@ +package jobs + +import ( + "sync" + + log "github.com/sirupsen/logrus" +) + +// StatusResponse contains current resource utilization. +type StatusResponse struct { + // Running job resources + UsedCPUs float32 + UsedMemory int + // Queued job resources (waiting in PendingJobs) + QueuedCPUs float32 + QueuedMemory int + // Maximum available resources + MaxCPUs float32 + MaxMemory int +} + +// ResourcePool tracks available vs used resources for job scheduling. +// Uses mutex for thread-safe access to shared state. +type ResourcePool struct { + mu sync.RWMutex + + maxCPUs float32 + maxMemory int // in MB + + usedCPUs float32 + usedMemory int + + queuedCPUs float32 + queuedMemory int + + releaseNotify chan struct{} // Signals QueueWorker when resources are released +} + +// NewResourcePool creates a ResourcePool with the given max limits. +// The limits should come from the centralized config to ensure consistency +// between resource pool and process validation. +func NewResourcePool(maxCPUs float32, maxMemory int) *ResourcePool { + log.Infof("ResourcePool initialized: maxCPUs=%.2f, maxMemory=%dMB", maxCPUs, maxMemory) + + return &ResourcePool{ + maxCPUs: maxCPUs, + maxMemory: maxMemory, + releaseNotify: make(chan struct{}, 1), + } +} + +// TryReserve attempts to reserve resources for a running job. +// Returns true if successful, false if not enough resources available. +func (rp *ResourcePool) TryReserve(cpus float32, memory int) bool { + rp.mu.Lock() + defer rp.mu.Unlock() + + if rp.usedCPUs+cpus <= rp.maxCPUs && rp.usedMemory+memory <= rp.maxMemory { + rp.usedCPUs += cpus + rp.usedMemory += memory + log.Debugf("Resources reserved: cpus=%.2f, memory=%dMB. Used: cpus=%.2f/%.2f, memory=%d/%dMB", + cpus, memory, rp.usedCPUs, rp.maxCPUs, rp.usedMemory, rp.maxMemory) + return true + } + return false +} + +// Release returns resources to the pool when a job finishes. +func (rp *ResourcePool) Release(cpus float32, memory int) { + rp.mu.Lock() + rp.usedCPUs -= cpus + rp.usedMemory -= memory + + // Clamp to zero (safety check) + if rp.usedCPUs < 0 { + rp.usedCPUs = 0 + } + if rp.usedMemory < 0 { + rp.usedMemory = 0 + } + + log.Debugf("Resources released: cpus=%.2f, memory=%dMB. Used: cpus=%.2f/%.2f, memory=%d/%dMB", + cpus, memory, rp.usedCPUs, rp.maxCPUs, rp.usedMemory, rp.maxMemory) + rp.mu.Unlock() + + // Signal QueueWorker that resources are available + select { + case rp.releaseNotify <- struct{}{}: + default: + } +} + +// AddQueued adds resources to the queued count when a job is enqueued to PendingJobs. +func (rp *ResourcePool) AddQueued(cpus float32, memory int) { + rp.mu.Lock() + defer rp.mu.Unlock() + + rp.queuedCPUs += cpus + rp.queuedMemory += memory + log.Debugf("Resources queued: cpus=%.2f, memory=%dMB. Queued: cpus=%.2f, memory=%dMB", + cpus, memory, rp.queuedCPUs, rp.queuedMemory) +} + +// RemoveQueued removes resources from the queued count when a job leaves PendingJobs. +func (rp *ResourcePool) RemoveQueued(cpus float32, memory int) { + rp.mu.Lock() + defer rp.mu.Unlock() + + rp.queuedCPUs -= cpus + rp.queuedMemory -= memory + + // Clamp to zero (safety check) + if rp.queuedCPUs < 0 { + rp.queuedCPUs = 0 + } + if rp.queuedMemory < 0 { + rp.queuedMemory = 0 + } + + log.Debugf("Resources dequeued: cpus=%.2f, memory=%dMB. Queued: cpus=%.2f, memory=%dMB", + cpus, memory, rp.queuedCPUs, rp.queuedMemory) +} + +// GetStatus returns current resource utilization. +func (rp *ResourcePool) GetStatus() StatusResponse { + rp.mu.RLock() + defer rp.mu.RUnlock() + + return StatusResponse{ + UsedCPUs: rp.usedCPUs, + UsedMemory: rp.usedMemory, + QueuedCPUs: rp.queuedCPUs, + QueuedMemory: rp.queuedMemory, + MaxCPUs: rp.maxCPUs, + MaxMemory: rp.maxMemory, + } +} + +// ReleaseChan returns the channel that signals when resources are released. +func (rp *ResourcePool) ReleaseChan() <-chan struct{} { + return rp.releaseNotify +} diff --git a/api/jobs/subprocess_jobs.go b/api/jobs/subprocess_jobs.go index 16a7bcc..5dfbd2a 100644 --- a/api/jobs/subprocess_jobs.go +++ b/api/jobs/subprocess_jobs.go @@ -22,6 +22,8 @@ type SubprocessJob struct { wg sync.WaitGroup // Used for monitoring running complete for sync jobs wgRun sync.WaitGroup + // closeOnce ensures Close() body executes exactly once + closeOnce sync.Once UUID string `json:"jobID"` PID string @@ -39,9 +41,11 @@ type SubprocessJob struct { logFile *os.File Resources - DB Database - StorageSvc *s3.S3 - DoneChan chan Job + DB Database + StorageSvc *s3.S3 + DoneChan chan Job + ResourcePool *ResourcePool + IsSync bool } func (j *SubprocessJob) WaitForRunCompletion() { @@ -68,6 +72,10 @@ func (j *SubprocessJob) CMD() []string { return j.Cmd } +func (j *SubprocessJob) GetResources() Resources { + return j.Resources +} + func (j *SubprocessJob) LogMessage(m string, level log.Level) { switch level { case 2: @@ -153,6 +161,22 @@ func (j *SubprocessJob) initLogger() error { } func (j *SubprocessJob) Create() error { + // Only reserve resources for sync jobs at creation time + // Async jobs will have resources reserved when QueueWorker starts them + if j.IsSync { + if !j.ResourcePool.TryReserve(j.Resources.CPUs, j.Resources.Memory) { + return fmt.Errorf("resources unavailable") + } + } + + // Track if creation succeeded to handle cleanup on error + success := false + defer func() { + if !success && j.IsSync { + j.ResourcePool.Release(j.Resources.CPUs, j.Resources.Memory) + } + }() + err := j.initLogger() if err != nil { return err @@ -171,29 +195,35 @@ func (j *SubprocessJob) Create() error { } j.NewStatusUpdate(ACCEPTED, time.Time{}) + + // Increment wgRun here so WaitForRunCompletion() blocks + // even if QueueWorker hasn't called StartRun() yet j.wgRun.Add(1) - go j.Run() + + success = true return nil } -func (j *SubprocessJob) Run() { - // Helper function to check if context is cancelled. - isCancelled := func() bool { - select { - case <-j.ctx.Done(): - j.logger.Info("Context cancelled.") - return true - default: - return false - } - } +func (j *SubprocessJob) IsSyncJob() bool { + return j.IsSync +} - // defers are executed in LIFO order - defer j.wgRun.Done() +func (j *SubprocessJob) Run() { + // Single consolidated defer for all cleanup operations. + // Order of operations: + // 1. Recover from panic (if any) and mark job as FAILED + // 2. Release resources - free CPU/memory for next job in queue + // 3. Close() - cleanup process, logs, remove from ActiveJobs + // (closeOnce guarantees this only executes once, even if Kill() also called Close()) + // 4. wgRun.Done() - unblock sync job waiters after results are available defer func() { - if !isCancelled() { - j.Close() + if r := recover(); r != nil { + j.logger.Errorf("Run() panicked: %v", r) + j.NewStatusUpdate(FAILED, time.Time{}) } + j.ResourcePool.Release(j.Resources.CPUs, j.Resources.Memory) + j.Close() + j.wgRun.Done() }() // Prepare the command @@ -230,8 +260,11 @@ func (j *SubprocessJob) Run() { j.PID = fmt.Sprintf("%d", j.execCmd.Process.Pid) j.NewStatusUpdate(RUNNING, time.Time{}) - if isCancelled() { + // Check if job was cancelled (Kill() was called) before waiting for process + select { + case <-j.ctx.Done(): return + default: } // Wait for the process to finish @@ -264,9 +297,12 @@ func (j *SubprocessJob) Kill() error { // If a dismiss status is updated the job is considered dismissed at this point // Close being graceful or not does not matter. - defer func() { - go j.Close() - }() + // Cancel context to signal Run() to exit early if still executing. + // Close() is safe to call from both here and Run()'s defer because + // closeOnce guarantees the cleanup body executes exactly once. + j.ctxCancel() + + go j.Close() return nil } @@ -311,32 +347,39 @@ func (j *SubprocessJob) RunFinished() { // Write final logs, cancelCtx func (j *SubprocessJob) Close() { - j.logger.Info("Starting closing routine.") - // to do: add panic recover to remove job from active jobs even if following panics - j.ctxCancel() // Signal Run function to terminate if running - - // Following is not needed since we are using context to signal job termination - // if j.execCmd.Process != nil && j.execCmd.ProcessState == nil { - // // Process related cleanups if process state is nil meaning process is still running - // err := j.execCmd.Process.Kill() - // if err != nil { - // j.logger.Errorf("Could not kill process. Error: %s", err.Error()) - // } - // } - - j.DoneChan <- j // At this point job can be safely removed from active jobs - - go func() { - j.wg.Wait() // wait if other routines like metadata are running - j.logFile.Close() - UploadLogsToStorage(j.StorageSvc, j.UUID, j.ProcessName) - // It is expected that logs will be requested multiple times for a recently finished job - // so we are waiting for one hour to before deleting the local copy - // so that we can avoid repetitive request to storage service. - // If the server shutdown, these files would need to be manually deleted - time.Sleep(time.Hour) - DeleteLocalLogs(j.StorageSvc, j.UUID, j.ProcessName) - }() + // closeOnce.Do() ensures this cleanup runs exactly once, even if Close() is called + // multiple times concurrently. + // + // How sync.Once works: + // - First caller: acquires internal lock, executes the function, marks done + // - Concurrent/subsequent callers: see done flag, return immediately without executing + j.closeOnce.Do(func() { + j.logger.Info("Starting closing routine.") + j.ctxCancel() // Signal Run function to terminate if running + + // // Following is not needed since we are using context to signal job termination + // if j.execCmd.Process != nil && j.execCmd.ProcessState == nil { + // // Process related cleanups if process state is nil meaning process is still running + // err := j.execCmd.Process.Kill() + // if err != nil { + // j.logger.Errorf("Could not kill process. Error: %s", err.Error()) + // } + // } + + j.DoneChan <- j // At this point job can be safely removed from active jobs + + go func() { + j.wg.Wait() // wait if other routines like metadata are running + j.logFile.Close() + UploadLogsToStorage(j.StorageSvc, j.UUID, j.ProcessName) + // It is expected that logs will be requested multiple times for a recently finished job + // so we are waiting for one hour to before deleting the local copy + // so that we can avoid repetitive request to storage service. + // If the server shutdown, these files would need to be manually deleted + time.Sleep(time.Hour) + DeleteLocalLogs(j.StorageSvc, j.UUID, j.ProcessName) + }() + }) } func (j *SubprocessJob) IMAGE() string { diff --git a/api/main.go b/api/main.go index 108a2ff..9869722 100644 --- a/api/main.go +++ b/api/main.go @@ -28,7 +28,7 @@ import ( var ( // Build-time version information - GitTag = "unknown" // will be injected at build-time + GitTag = "unknown" // will be injected at build-time ) var ( @@ -39,6 +39,8 @@ var ( logFile string authSvc string authLvl string + maxLocalCPUs string + maxLocalMemory string ) func init() { @@ -66,6 +68,8 @@ func init() { flag.StringVar(&logFile, "lf", resolveValue("LOG_FILE", "/.data/logs/api.jsonl"), "specify the log file") flag.StringVar(&authSvc, "au", resolveValue("AUTH_SERVICE", ""), "specify the auth service") flag.StringVar(&authLvl, "al", resolveValue("AUTH_LEVEL", "0"), "specify the authorization striction level") + flag.StringVar(&maxLocalCPUs, "mlc", resolveValue("MAX_LOCAL_CPUS", ""), "max CPUs for local jobs (default: 80% of system CPUs)") + flag.StringVar(&maxLocalMemory, "mlm", resolveValue("MAX_LOCAL_MEMORY_MB", ""), "max memory in MB for local jobs (default: 8192)") flag.Parse() } @@ -259,7 +263,7 @@ func main() { initPlugins() // Initialize resources - rh := handlers.NewRESTHander(GitTag) + rh := handlers.NewRESTHander(GitTag, maxLocalCPUs, maxLocalMemory) // todo: handle this error: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running // todo: all non terminated job statuses should be updated to unknown // todo: all logs in the logs directory should be moved to storage @@ -267,6 +271,7 @@ func main() { // Goroutines go rh.StatusUpdateRoutine() go rh.JobCompletionRoutine() + rh.QueueWorker.Start() // Start() spawns its own goroutine and supports Stop() for graceful shutdown // Set server configuration e := echo.New() @@ -316,6 +321,9 @@ func main() { pg.PUT("/jobs/:jobID/status", rh.JobStatusUpdateHandler) // e.POST("/jobs/:jobID/results", rh.JobResultsUpdateHandler) + // Admin + e.GET("/admin/resources", rh.ResourceStatusHandler) + _, lw := initLogger() fmt.Println("Logging to", logFile) e.Use(middleware.LoggerWithConfig(middleware.LoggerConfig{ @@ -349,7 +357,11 @@ func main() { // Shutdown the server // By default, Docker provides a grace period of 10 seconds with the docker stop command. - // Kill any running docker containers/subprocess (clean up resources) + // Stop QueueWorker from starting new jobs + rh.QueueWorker.Stop() + + // Kill any running docker containers / subprocesses (clean up resources) + // Kill all active jobs // Send dismiss notice to all cloud jobs rh.ActiveJobs.KillAll() log.Info("kill command sent to all active jobs") diff --git a/api/processes/processes.go b/api/processes/processes.go index 06fad64..87331d3 100644 --- a/api/processes/processes.go +++ b/api/processes/processes.go @@ -227,13 +227,22 @@ func MarshallProcess(f string) (Process, error) { p.Host.Image = jdi.Image p.Config.Resources.Memory = jdi.Memory // although we are fetching this information but is not being used anywhere or reported to users p.Config.Resources.CPUs = jdi.VCPUs // although we are fetching this information but is not being used anywhere or reported to users + case "docker", "subprocess": + // Set default resources if not specified in config + if p.Config.Resources.CPUs == 0 { + p.Config.Resources.CPUs = 1.0 + } + if p.Config.Resources.Memory == 0 { + p.Config.Resources.Memory = 512 + } } return p, nil } -// Load all processes from yml files in the given directory and subdirectories -func LoadProcesses(dir string) (ProcessList, error) { +// Load all processes from yml files in the given directory and subdirectories. +// maxCPUs and maxMemory are resource limits for validating docker/subprocess processes. +func LoadProcesses(dir string, maxCPUs float32, maxMemory int) (ProcessList, error) { var pl ProcessList ymls, err := filepath.Glob(fmt.Sprintf("%s/*/*.yml", dir)) @@ -253,7 +262,7 @@ func LoadProcesses(dir string) (ProcessList, error) { log.Errorf("could not register process %s Error: %v", filepath.Base(y), err) continue } - err = p.Validate() + err = p.Validate(maxCPUs, maxMemory) if err != nil { log.Errorf("could not register process %s Error: %v", filepath.Base(y), err.Error()) continue @@ -273,7 +282,9 @@ func LoadProcesses(dir string) (ProcessList, error) { } // Validate checks if the Process has all required fields properly set. -func (p *Process) Validate() error { +// maxCPUs and maxMemory are the resource limits for local job scheduling. +// Pass 0 for both to skip resource limit validation. +func (p *Process) Validate(maxCPUs float32, maxMemory int) error { if p.Info.ID == "" { return errors.New("process ID is required") } @@ -343,6 +354,16 @@ func (p *Process) Validate() error { } } + // Validate resource limits for local job types (docker/subprocess) + if p.Host.Type == "docker" || p.Host.Type == "subprocess" { + if maxCPUs > 0 && p.Config.Resources.CPUs > maxCPUs { + return fmt.Errorf("process requires %.2f CPUs but max allowed is %.2f", p.Config.Resources.CPUs, maxCPUs) + } + if maxMemory > 0 && p.Config.Resources.Memory > maxMemory { + return fmt.Errorf("process requires %dMB memory but max allowed is %dMB", p.Config.Resources.Memory, maxMemory) + } + } + // Validate Inputs for i, input := range p.Inputs { if input.ID == "" { diff --git a/api/public/css/main.css b/api/public/css/main.css index 31fa062..98af1a4 100644 --- a/api/public/css/main.css +++ b/api/public/css/main.css @@ -24,8 +24,12 @@ --log-error: var(--status-server-error-color); --log-debug: #007BFF; --log-info: green; + + --bar-used-color: #4caf50; + --bar-queued-color: #ff9800; } +/* overwrite for dark mode */ @media (prefers-color-scheme: dark) { :root { --font-color: #FFF; @@ -214,4 +218,88 @@ code[class*="language-"] { .footer-bar a { font-family: monospace; +} + +/* Resource Status Bars */ +.resource-section { + margin: 25px 0; + max-width: 600px; +} + +.resource-label { + font-weight: bold; + margin-bottom: 5px; +} + +.resource-label-secondary { + font-size: 0.9em; + color: var(--font-color-gray); + margin-top: 10px; + margin-bottom: 5px; +} + +.bar-container { + background-color: var(--table-row-even-bg); + border-radius: 4px; + height: 24px; + position: relative; + overflow: hidden; + border: 1px solid var(--table-border-color); +} + +.bar-queued-stack { + display: flex; + flex-direction: column; + gap: 4px; +} + +.bar-used { + background-color: var(--bar-used-color); + height: 100%; + position: absolute; + left: 0; + border-radius: 3px 0 0 3px; +} + +.bar-queued { + background-color: var(--bar-queued-color); + height: 18px; + border-radius: 3px; + border: 1px solid var(--table-border-color); +} + +.bar-queued-full { + width: 100%; +} + +.resource-legend { + margin-top: 30px; + display: flex; + gap: 20px; + flex-wrap: wrap; +} + +.legend-item { + display: flex; + align-items: center; + gap: 5px; +} + +.legend-box { + width: 20px; + height: 20px; + border-radius: 3px; + border: 1px solid var(--table-border-color); +} + +.legend-used { + background-color: var(--bar-used-color); +} + +.legend-queued { + background-color: var(--bar-queued-color); +} + +.legend-available { + background-color: var(--table-row-even-bg); } \ No newline at end of file diff --git a/api/views/resourceStatus.html b/api/views/resourceStatus.html new file mode 100644 index 0000000..57a1f0f --- /dev/null +++ b/api/views/resourceStatus.html @@ -0,0 +1,86 @@ +{{define "resourceStatus"}} + + + +
+ + +