Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ jobs:
echo PLUGINS_DIR='/.data/plugins' >> .env
echo TMP_JOB_LOGS_DIR='/.data/tmp/logs' >> .env

echo MAX_LOCAL_CPUS=4 >> .env
echo MAX_LOCAL_MEMORY=4096 >> .env


echo MINIO_ACCESS_KEY_ID=user >> .env
echo MINIO_SECRET_ACCESS_KEY=password >> .env
Expand Down
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,22 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## Unreleased

### API
#### POST /processes/{processID}/execution
- Execution mode now determined per OGC API - Processes Requirements 25/26: honors `Prefer: respond-async` header when process supports both modes, defaults to sync otherwise
- Returns `Preference-Applied` response header when async preference is honored

#### GET /admin/resources
- New endpoint to view resource utilization for local jobs (docker, subprocess) and queue status

### Configuration
- New `MAX_LOCAL_CPUS` and `MAX_LOCAL_MEMORY` environment variables (or `--max-local-cpus` and `--max-local-memory` CLI flags) to set resource limits for local job scheduling
- Process definitions are validated against these limits at startup and when adding/updating processes via API
- Processes without explicit resource requirements use default values

### Documentation
- Added sequence diagram for local scheduler

## [0.2.1] - 2025-12-03

### API
Expand Down
18 changes: 15 additions & 3 deletions DEV_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

## Auth
- If auth is enabled some or all routes are protected based on env variable `AUTH_LEVEL` settings.
- The middleware validate and parse JWT to verify `X-ProcessAPI-User-Email` header and inject `X-ProcessAPI-User-Roles` header.
- The middleware validate and parse JWT to verify `X-SEPEX-User-Email` header and inject `X-SEPEX-User-Roles` header.
- A user can use tools like Postman to set these headers themselves, but if auth is enabled, they will be checked against the token. This setup allows adding submitter info to the database when auth is not enabled.
- If auth is enabled `X-ProcessAPI-User-Email` header is mandatory.
- Requests from Service Role will not be verified for `X-ProcessAPI-User-Email`.
- If auth is enabled `X-SEPEX-User-Email` header is mandatory.
- Requests from Service Role will not be verified for `X-SEPEX-User-Email`.
- Only service_accounts can post callbacks
- Requests from Admin Role are allowed to execute all processes, non-admins must have the role with same name as `processID` to execute that process.
- Requests from Admin Role are allowed to retrieve all jobs information, non admins can only retrieve information for jobs that they submitted.
Expand All @@ -30,6 +30,18 @@
## Scope
- The behavior of logging is unknown for AWS Batch processes with job definitions having number of attempts more than 1.

## Local-Scheduler


## Local Scheduler

**Design decisions:**

1. ResourceLimits calculated once at startup from flags/env vars. Dynamic reconfiguration rejected because a queued job could block forever if limits are reduced below its requirements after it was already validated and enqueued.

1. ResourcePool and PendingJobs use `sync.Mutex`. Channels add complexity without benefit for simple state. Go channels use internal mutexes anyway, so performance is similar.


## Release/Versioning/Changelog

The project uses an automated release workflow triggered by semver tags (e.g., `v1.0.0`, `v1.0.0-beta`). The workflow validates prerequisites, runs security scans, builds multi-platform container images, and creates GitHub releases with auto-generated release notes.
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ Processes are computational tasks described through a configuration file that ca

### Jobs
![](imgs/readme/jobs.png)
Each execution of a process is called a job. A job can be synchronous or asynchronous depending on which host it is being executed upon. Synchronous jobs return responses after the job has reached a finished state, meaning either successful or failed. The asynchronous jobs return a response immediately with a job id for the client so that the client can monitor the jobs.
Each execution of a process is called a job. A job can be synchronous or asynchronous depending on the process configuration and client preference. Synchronous jobs return responses after the job has reached a finished state, meaning either successful or failed. The asynchronous jobs return a response immediately with a job id for the client so that the client can monitor the jobs.


*Note on Processes: The developers must make sure they choose the right platform to execute a process. The processes that are short-lived and fast and do not create a file resource as an output, for example getting the water surface elevation values for a coordinate from cloud raster, must be registered to run on the local machine so that they are synchronous. These kinds of processes should output data in JSON format.*

Expand Down
8 changes: 4 additions & 4 deletions api/auth/keycloak.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ func (kas *KeycloakAuthStrategy) ValidateToken(tokenString string) (*Claims, err
return &claims, nil
}

// Validate X-ProcessAPI-User-Email header against user from claims
// Validate X-SEPEX-User-Email header against user from claims
func (kas *KeycloakAuthStrategy) ValidateUser(c echo.Context, claims *Claims) (err error) {
roles := claims.RealmAccess["roles"]

if kas.ServiceRoleName != "" && overlap(roles, []string{kas.ServiceRoleName}) {
// assume provided header is correct
} else if claims.Email == "" || !(c.Request().Header.Get("X-ProcessAPI-User-Email") == claims.Email) {
return fmt.Errorf("invalid X-ProcessAPI-User-Email header")
} else if claims.Email == "" || !(c.Request().Header.Get("X-SEPEX-User-Email") == claims.Email) {
return fmt.Errorf("invalid X-SEPEX-User-Email header")
}

return nil
Expand All @@ -150,7 +150,7 @@ func (kas *KeycloakAuthStrategy) SetUserRolesHeader(c echo.Context, claims *Clai
roles, exists := claims.RealmAccess["roles"]
if exists {
rolesString := strings.Join(roles, ",")
c.Request().Header.Set("X-ProcessAPI-User-Roles", rolesString)
c.Request().Header.Set("X-SEPEX-User-Roles", rolesString)
} else {
return nil
}
Expand Down
67 changes: 65 additions & 2 deletions api/handlers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"fmt"
"io"
"os"
"runtime"
"strconv"
"strings"
"text/template"

Expand All @@ -29,6 +31,14 @@ func (t Template) Render(w io.Writer, name string, data interface{}, c echo.Cont
return t.templates.ExecuteTemplate(w, name, data)
}

// ResourceLimits holds the maximum resource limits for job scheduling.
// This is read once at startup and shared across the application to ensure
// consistent validation between process registration and job execution.
type ResourceLimits struct {
MaxCPUs float32
MaxMemory int // in MB
}

// Config holds the configuration settings for the REST API server.
type Config struct {
// Only settings that are typically environment-specific and can be loaded from
Expand All @@ -39,6 +49,9 @@ type Config struct {
AuthLevel int
AdminRoleName string
ServiceRoleName string

// Resource limits for local job scheduling (docker/subprocess)
ResourceLimits *ResourceLimits
}

// RESTHandler encapsulates the operational components and dependencies necessary for handling
Expand All @@ -56,6 +69,9 @@ type RESTHandler struct {
DB jobs.Database
MessageQueue *jobs.MessageQueue
ActiveJobs *jobs.ActiveJobs
PendingJobs *jobs.PendingJobs
ResourcePool *jobs.ResourcePool
QueueWorker *jobs.QueueWorker
ProcessList *pr.ProcessList
Config *Config
}
Expand All @@ -71,7 +87,7 @@ func prettyPrint(v interface{}) string {

// Initializes resources and return a new handler
// errors are fatal
func NewRESTHander(gitTag string) *RESTHandler {
func NewRESTHander(gitTag string, maxLocalCPUs string, maxLocalMemory string) *RESTHandler {
apiName, exist := os.LookupEnv("API_NAME")
if !exist {
log.Warn("env variable API_NAME not set")
Expand All @@ -82,6 +98,9 @@ func NewRESTHander(gitTag string) *RESTHandler {
log.Warn("env variable REPO_URL not set")
}

// Calculate resource limits once at startup
resourceLimits := newResourceLimits(maxLocalCPUs, maxLocalMemory)

// working with pointers here so as not to copy large templates, yamls, and ActiveJobs
config := RESTHandler{
Name: apiName,
Expand All @@ -101,6 +120,7 @@ func NewRESTHander(gitTag string) *RESTHandler {
Config: &Config{
AdminRoleName: os.Getenv("AUTH_ADMIN_ROLE"),
ServiceRoleName: os.Getenv("AUTH_SERVICE_ROLE"),
ResourceLimits: resourceLimits,
},
}

Expand Down Expand Up @@ -159,14 +179,23 @@ func NewRESTHander(gitTag string) *RESTHandler {
ac.Jobs = make(map[string]*jobs.Job)
config.ActiveJobs = &ac

// Setup Pending Jobs queue for async jobs waiting for resources
config.PendingJobs = jobs.NewPendingJobs()

// Setup Resource Pool for tracking CPU/memory availability
config.ResourcePool = jobs.NewResourcePool(resourceLimits.MaxCPUs, resourceLimits.MaxMemory)

// Setup Queue Worker to process pending jobs
config.QueueWorker = jobs.NewQueueWorker(config.PendingJobs, config.ResourcePool)

config.MessageQueue = &jobs.MessageQueue{
StatusChan: make(chan jobs.StatusMessage, 500),
JobDone: make(chan jobs.Job, 1),
}

// Create local logs directory if not exist
pluginsDir := os.Getenv("PLUGINS_DIR") // We already know this env variable exist because it is being checked in plguinsInit function
processList, err := pr.LoadProcesses(pluginsDir)
processList, err := pr.LoadProcesses(pluginsDir, resourceLimits.MaxCPUs, resourceLimits.MaxMemory)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -233,3 +262,37 @@ func NewStorageService(providerType string) (*s3.S3, error) {
return nil, fmt.Errorf("unsupported storage provider type")
}
}

// newResourceLimits creates ResourceLimits from the provided values.
// Values come from CLI flags which already have env var fallback via resolveValue().
// Falls back to 80% of system CPUs and 8GB memory if not specified.
func newResourceLimits(maxLocalCPUsStr string, maxLocalMemoryStr string) *ResourceLimits {
numCPUs := float32(runtime.NumCPU())

// Default to 80% of system CPUs
maxCPUs := numCPUs * 0.8
if maxLocalCPUsStr != "" {
if parsed, err := strconv.ParseFloat(maxLocalCPUsStr, 32); err == nil {
maxCPUs = float32(parsed)
} else {
log.Warnf("Invalid MAX_LOCAL_CPUS value: %s, using default %.2f", maxLocalCPUsStr, maxCPUs)
}
}

// Default to 8GB
maxMemory := 8192
if maxLocalMemoryStr != "" {
if parsed, err := strconv.Atoi(maxLocalMemoryStr); err == nil {
maxMemory = parsed
} else {
log.Warnf("Invalid MAX_LOCAL_MEMORY_MB value: %s, using default %d", maxLocalMemoryStr, maxMemory)
}
}

log.Infof("ResourceLimits initialized: maxCPUs=%.2f, maxMemory=%dMB", maxCPUs, maxMemory)

return &ResourceLimits{
MaxCPUs: maxCPUs,
MaxMemory: maxMemory,
}
}
83 changes: 83 additions & 0 deletions api/handlers/execution_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package handlers

import (
"app/utils"
"strings"
)

// ExecutionModeResult contains the determined execution mode and whether
// a Prefer header preference was honored (for Preference-Applied response header)
type ExecutionModeResult struct {
Mode string // "sync-execute" or "async-execute"
PreferenceApplied string // Value for Preference-Applied header, empty if none
}

// DetermineExecutionMode implements OGC API - Processes execution mode logic
// per Requirements 25 and 26, and Recommendation 12A.
//
// Decision matrix:
// - No Prefer header + async-only process → async (Req 25A)
// - No Prefer header + sync-only process → sync (Req 25B)
// - No Prefer header + both modes → sync (Req 25C - default)
// - Prefer: respond-async + async-only → async (Req 26A)
// - Prefer: respond-async + sync-only → sync (Req 26B - ignore preference)
// - Prefer: respond-async + both modes → async (Req 26C + Rec 12A - honor preference)
func DetermineExecutionMode(jobControlOptions []string, preferHeader string) ExecutionModeResult {
supportsSync := utils.StringInSlice("sync-execute", jobControlOptions)
supportsAsync := utils.StringInSlice("async-execute", jobControlOptions)
wantsAsync := parseRespondAsyncPreference(preferHeader)

result := ExecutionModeResult{}

// Case 1: Process only supports async (Req 25A, 26A)
if supportsAsync && !supportsSync {
result.Mode = "async-execute"
return result
}

// Case 2: Process only supports sync (Req 25B, 26B)
if supportsSync && !supportsAsync {
result.Mode = "sync-execute"
// If client requested async but we can only do sync, no preference was applied
return result
}

// Case 3: Process supports both modes
if wantsAsync {
// Req 26C + Rec 12A: Honor the async preference
result.Mode = "async-execute"
result.PreferenceApplied = "respond-async"
return result
}

// Req 25C: Default to sync when no preference given
result.Mode = "sync-execute"
return result
}

// parseRespondAsyncPreference checks if the Prefer header contains "respond-async"
// The Prefer header can contain multiple comma or space separated preferences.
// Example: "respond-async, wait=10" or "respond-async"
func parseRespondAsyncPreference(preferHeader string) bool {
if preferHeader == "" {
return false
}

// Prefer header values can be comma-separated
// Each preference can have parameters separated by semicolons
// We're looking for "respond-async" token
preferences := strings.Split(preferHeader, ",")
for _, pref := range preferences {
// Trim whitespace and get the preference name (before any parameters)
pref = strings.TrimSpace(pref)
// Handle parameters like "respond-async; wait=10"
parts := strings.SplitN(pref, ";", 2)
prefName := strings.TrimSpace(parts[0])

if prefName == "respond-async" {
return true
}
}

return false
}
Loading
Loading