From b2df8bd8f63dcc5d1e5dccc67f37f6dd0304ec3e Mon Sep 17 00:00:00 2001 From: "fenjianhui.fjh" Date: Wed, 21 Jan 2026 21:01:41 +0800 Subject: [PATCH] refactor(task-executor): improve task lifecycle management and timeout handling - Remove deprecated EnableContainerMode flag and related logic from config and executor initialization - Add TimeoutSeconds field to Process spec for task timeout control - Enhance process executor to detect task timeout and report Timeout state with detailed substatus - Refactor task manager to maintain precise active task count via dynamic evaluation - Implement asynchronous task stop on deletion and timeout to avoid reconcile loop blocking - Update task status reconciliation with proper locking and persistent state updates - Improve internal to API task conversion to reflect timeout termination and pod status accurately - Add PodTemplateSpec support to tasks and propagate to API representations - Expand tests to cover async stop on delete, timeout detection, active task counting, and status conversion - Remove unused cached activeTasks counter and improve concurrency safety via stopping map - Simplify container executor initialization by always creating executor without conditional flag - Clean up logging and error handling in task lifecycle operations --- kubernetes/cmd/task-executor/main.go | 5 - .../task_scheduling_strategy_default.go | 18 +- .../internal/task-executor/config/config.go | 38 ++-- .../task-executor/manager/task_manager.go | 200 ++++++++--------- .../manager/task_manager_test.go | 208 ++++++++++++++++++ .../task-executor/runtime/composite.go | 12 +- .../internal/task-executor/runtime/process.go | 38 +++- .../task-executor/runtime/process_test.go | 134 ++++++++++- .../internal/task-executor/server/handler.go | 163 +++++++++++--- .../task-executor/server/handler_test.go | 186 ++++++++++++++++ .../internal/task-executor/types/task.go | 9 +- kubernetes/pkg/task-executor/types.go | 2 + kubernetes/test/e2e_task/task_e2e_test.go | 115 ++++++++++ 13 files changed, 927 insertions(+), 201 deletions(-) diff --git a/kubernetes/cmd/task-executor/main.go b/kubernetes/cmd/task-executor/main.go index 42332193..6000af05 100644 --- a/kubernetes/cmd/task-executor/main.go +++ b/kubernetes/cmd/task-executor/main.go @@ -53,11 +53,6 @@ func main() { klog.ErrorS(err, "failed to create executor") os.Exit(1) } - mode := "process" - if cfg.EnableContainerMode { - mode = "container" - } - klog.InfoS("executor initialized", "mode", mode) // Initialize TaskManager taskManager, err := manager.NewTaskManager(cfg, taskStore, exec) diff --git a/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go b/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go index f30bab2d..f35cdd20 100644 --- a/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go +++ b/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go @@ -69,17 +69,19 @@ func (s *DefaultTaskSchedulingStrategy) getTaskSpec(batchSbx *sandboxv1alpha1.Ba return nil, fmt.Errorf("batchsandbox: failed to unmarshal %s to TaskTemplateSpec, idx %d, err %w", modified, idx, err) } task.Process = &api.Process{ - Command: newTaskTemplate.Spec.Process.Command, - Args: newTaskTemplate.Spec.Process.Args, - Env: newTaskTemplate.Spec.Process.Env, - WorkingDir: newTaskTemplate.Spec.Process.WorkingDir, + Command: newTaskTemplate.Spec.Process.Command, + Args: newTaskTemplate.Spec.Process.Args, + Env: newTaskTemplate.Spec.Process.Env, + WorkingDir: newTaskTemplate.Spec.Process.WorkingDir, + TimeoutSeconds: batchSbx.Spec.TaskTemplate.Spec.TimeoutSeconds, } } else if batchSbx.Spec.TaskTemplate != nil && batchSbx.Spec.TaskTemplate.Spec.Process != nil { task.Process = &api.Process{ - Command: batchSbx.Spec.TaskTemplate.Spec.Process.Command, - Args: batchSbx.Spec.TaskTemplate.Spec.Process.Args, - Env: batchSbx.Spec.TaskTemplate.Spec.Process.Env, - WorkingDir: batchSbx.Spec.TaskTemplate.Spec.Process.WorkingDir, + Command: batchSbx.Spec.TaskTemplate.Spec.Process.Command, + Args: batchSbx.Spec.TaskTemplate.Spec.Process.Args, + Env: batchSbx.Spec.TaskTemplate.Spec.Process.Env, + WorkingDir: batchSbx.Spec.TaskTemplate.Spec.Process.WorkingDir, + TimeoutSeconds: batchSbx.Spec.TaskTemplate.Spec.TimeoutSeconds, } } return task, nil diff --git a/kubernetes/internal/task-executor/config/config.go b/kubernetes/internal/task-executor/config/config.go index 9e8f8eee..2ba05ee5 100644 --- a/kubernetes/internal/task-executor/config/config.go +++ b/kubernetes/internal/task-executor/config/config.go @@ -21,28 +21,26 @@ import ( ) type Config struct { - DataDir string - ListenAddr string - CRISocket string - ReadTimeout time.Duration - WriteTimeout time.Duration - ReconcileInterval time.Duration - EnableSidecarMode bool - EnableContainerMode bool - MainContainerName string + DataDir string + ListenAddr string + CRISocket string + ReadTimeout time.Duration + WriteTimeout time.Duration + ReconcileInterval time.Duration + EnableSidecarMode bool + MainContainerName string } func NewConfig() *Config { return &Config{ - DataDir: "/var/lib/sandbox/tasks", - ListenAddr: "0.0.0.0:5758", - CRISocket: "/var/run/containerd/containerd.sock", - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - ReconcileInterval: 500 * time.Millisecond, - EnableContainerMode: false, - EnableSidecarMode: false, - MainContainerName: "main", + DataDir: "/var/lib/sandbox/tasks", + ListenAddr: "0.0.0.0:5758", + CRISocket: "/var/run/containerd/containerd.sock", + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + ReconcileInterval: 500 * time.Millisecond, + EnableSidecarMode: false, + MainContainerName: "main", } } @@ -56,9 +54,6 @@ func (c *Config) LoadFromEnv() { if v := os.Getenv("CRI_SOCKET"); v != "" { c.CRISocket = v } - if v := os.Getenv("ENABLE_CONTAINER_MODE"); v == "true" { - c.EnableContainerMode = true - } if v := os.Getenv("ENABLE_SIDECAR_MODE"); v == "true" { c.EnableSidecarMode = true } @@ -71,7 +66,6 @@ func (c *Config) LoadFromFlags() { flag.StringVar(&c.DataDir, "data-dir", c.DataDir, "data storage directory") flag.StringVar(&c.ListenAddr, "listen-addr", c.ListenAddr, "service listen address") flag.StringVar(&c.CRISocket, "cri-socket", c.CRISocket, "CRI socket path for container runner mode") - flag.BoolVar(&c.EnableContainerMode, "enable-container-mode", c.EnableContainerMode, "enable container runner mode") flag.BoolVar(&c.EnableSidecarMode, "enable-sidecar-mode", c.EnableSidecarMode, "enable sidecar runner mode") flag.StringVar(&c.MainContainerName, "main-container-name", c.MainContainerName, "main container name") flag.Parse() diff --git a/kubernetes/internal/task-executor/manager/task_manager.go b/kubernetes/internal/task-executor/manager/task_manager.go index 48090e1b..0201f5c8 100644 --- a/kubernetes/internal/task-executor/manager/task_manager.go +++ b/kubernetes/internal/task-executor/manager/task_manager.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "reflect" "sync" "time" @@ -37,11 +38,14 @@ const ( type taskManager struct { mu sync.RWMutex tasks map[string]*types.Task // name -> task - // TODO we need design queue for pending tasks - activeTasks int // Count of active tasks (not deleted AND not terminated) - store store.TaskStore - executor runtime.Executor - config *config.Config + + store store.TaskStore + executor runtime.Executor + config *config.Config + + // stopping tracks tasks that are currently being stopped. + // This prevents duplicate Stop calls and status rollback during async stop. + stopping map[string]bool // Reconcile loop control stopCh chan struct{} @@ -65,6 +69,7 @@ func NewTaskManager(cfg *config.Config, taskStore store.TaskStore, exec runtime. store: taskStore, executor: exec, config: cfg, + stopping: make(map[string]bool), stopCh: make(chan struct{}), doneCh: make(chan struct{}), }, nil @@ -83,6 +88,18 @@ func (m *taskManager) isTaskActive(task *types.Task) bool { return state == types.TaskStatePending || state == types.TaskStateRunning } +// countActiveTasks counts tasks that are active (not deleted AND not terminated). +// Must be called with lock held. +func (m *taskManager) countActiveTasks() int { + count := 0 + for _, task := range m.tasks { + if m.isTaskActive(task) { + count++ + } + } + return count +} + // Create creates a new task and starts execution. func (m *taskManager) Create(ctx context.Context, task *types.Task) (*types.Task, error) { if task == nil { @@ -100,8 +117,8 @@ func (m *taskManager) Create(ctx context.Context, task *types.Task) (*types.Task return nil, fmt.Errorf("task %s already exists", task.Name) } - // Enforce single task limitation using the cached counter - if m.activeTasks >= maxConcurrentTasks { + // Enforce single task limitation using real-time count + if m.countActiveTasks() >= maxConcurrentTasks { return nil, fmt.Errorf("maximum concurrent tasks (%d) reached, cannot create new task", maxConcurrentTasks) } @@ -133,14 +150,10 @@ func (m *taskManager) Create(ctx context.Context, task *types.Task) (*types.Task // Safety fallback: Ensure task has a state if task.Status.State == "" { task.Status.State = types.TaskStatePending - task.Status.Reason = "Initialized" } // Add to memory m.tasks[task.Name] = task - if m.isTaskActive(task) { - m.activeTasks++ - } klog.InfoS("task created successfully", "name", task.Name) return task, nil @@ -244,11 +257,6 @@ func (m *taskManager) softDeleteLocked(ctx context.Context, task *types.Task) er return nil // Already marked } - // If the task was active, decrement the active count - if m.isTaskActive(task) { - m.activeTasks-- - } - now := time.Now() task.DeletionTimestamp = &now @@ -294,8 +302,8 @@ func (m *taskManager) createTaskLocked(ctx context.Context, task *types.Task) er return fmt.Errorf("task %s already exists", task.Name) } - // Enforce single task limitation using the cached counter - if m.activeTasks >= maxConcurrentTasks { + // Enforce single task limitation using real-time count + if m.countActiveTasks() >= maxConcurrentTasks { return fmt.Errorf("maximum concurrent tasks (%d) reached, cannot create new task", maxConcurrentTasks) } @@ -324,36 +332,6 @@ func (m *taskManager) createTaskLocked(ctx context.Context, task *types.Task) er // Add to memory m.tasks[task.Name] = task - if m.isTaskActive(task) { - m.activeTasks++ - } - return nil -} - -// deleteTaskLocked deletes a task without acquiring the lock (must be called with lock held). -func (m *taskManager) deleteTaskLocked(ctx context.Context, name string) error { - task, exists := m.tasks[name] - if !exists { - // Already deleted, no error - klog.InfoS("task not found, skipping delete", "name", name) - return nil - } - - // Stop task execution - if err := m.executor.Stop(ctx, task); err != nil { - klog.ErrorS(err, "failed to stop task", "name", name) - // Continue with deletion even if stop fails - } - - // Delete from store - if err := m.store.Delete(ctx, name); err != nil { - return fmt.Errorf("failed to delete task from store: %w", err) - } - - // Remove from memory - delete(m.tasks, name) - - klog.InfoS("task deleted successfully", "name", name) return nil } @@ -398,11 +376,6 @@ func (m *taskManager) recoverTasks(ctx context.Context) error { // Add to memory m.tasks[task.Name] = task - // Update active count - if m.isTaskActive(task) { - m.activeTasks++ - } - klog.InfoS("recovered task", "name", task.Name, "state", task.Status.State, "deleting", task.DeletionTimestamp != nil) } @@ -432,71 +405,92 @@ func (m *taskManager) reconcileLoop(ctx context.Context) { // reconcileTasks updates the status of all tasks and handles deletion. func (m *taskManager) reconcileTasks(ctx context.Context) { - m.mu.RLock() - tasks := make([]*types.Task, 0, len(m.tasks)) - for _, task := range m.tasks { - if task != nil { - tasks = append(tasks, task) - } - } - m.mu.RUnlock() + m.mu.Lock() + defer m.mu.Unlock() - // Update each task's status - for _, task := range tasks { + var tasksToDelete []string + + for name, task := range m.tasks { + if task == nil { + continue + } status, err := m.executor.Inspect(ctx, task) if err != nil { - klog.ErrorS(err, "failed to inspect task", "name", task.Name) + klog.ErrorS(err, "failed to inspect task", "name", name) continue } + state := status.State - // Acquire lock to safely update status and active count - m.mu.Lock() - wasActive := m.isTaskActive(task) + // Determine if we should stop the task + shouldStop := false + stopReason := "" - // Update status - task.Status = *status + if task.DeletionTimestamp != nil && !m.stopping[name] { + if !isTerminalState(state) { + shouldStop = true + stopReason = "deletion requested" + } + } else if state == types.TaskStateTimeout && !m.stopping[name] { + shouldStop = true + stopReason = "timeout exceeded" + } - isActive := m.isTaskActive(task) + if shouldStop { + klog.InfoS("stopping task", "name", name, "reason", stopReason) + m.stopping[name] = true - // If task transitioned from Active -> Inactive (Terminated), decrement active count - if wasActive && !isActive { - m.activeTasks-- - } - m.mu.Unlock() - - // Handle Deletion - if task.DeletionTimestamp != nil { - if task.Status.State == types.TaskStateSucceeded || task.Status.State == types.TaskStateFailed { - // Task is fully terminated, finalize deletion (remove from store/memory) - klog.InfoS("task terminated, finalizing deletion", "name", task.Name) - m.mu.Lock() - if err := m.deleteTaskLocked(ctx, task.Name); err != nil { - klog.ErrorS(err, "failed to finalize task deletion", "name", task.Name) + // Async stop to avoid blocking the reconcile loop + go func(t *types.Task, taskName string) { + defer func() { + m.mu.Lock() + delete(m.stopping, taskName) + m.mu.Unlock() + }() + + if err := m.executor.Stop(ctx, t); err != nil { + klog.ErrorS(err, "failed to stop task", "name", taskName) } - m.mu.Unlock() - continue - } else { - // Task is still running, trigger Stop - klog.InfoS("stopping task marked for deletion", "name", task.Name) - if err := m.executor.Stop(ctx, task); err != nil { - klog.ErrorS(err, "failed to stop task", "name", task.Name) + klog.InfoS("task stopped", "name", taskName) + }(task, name) + } + + // Determine if we can finalize deletion + if task.DeletionTimestamp != nil && isTerminalState(state) { + klog.InfoS("task terminated, finalizing deletion", "name", name) + tasksToDelete = append(tasksToDelete, name) + } + + // Update status only if not stopping (prevent status rollback during async stop) + if !m.stopping[name] { + if !reflect.DeepEqual(task.Status, *status) { + task.Status = *status + if err := m.store.Update(ctx, task); err != nil { + klog.ErrorS(err, "failed to update task status in store", "name", name) } } } + } - // Update task status in memory only. - // We do not need to persist to store here because Persistent fields (Spec, PID, etc.) do not change during the reconcile loop. - // The Status struct IS persisted, but we choose not to persist every few seconds if only runtime state changes. - // However, since we made Status a first-class citizen and it's small, we COULD persist it. - // But for performance, we stick to the decision: only persist on significant changes (Create/Delete). - // Note: If we want to persist ExitCode/FinishedAt, we might need to Update store when state changes to Terminated. - // Let's add that optimization: if state changed to Terminated, persist it. - if wasActive && !isActive { - if err := m.store.Update(ctx, task); err != nil { - klog.ErrorS(err, "failed to update task status in store", "name", task.Name) - } + // Finalize deletions + for _, name := range tasksToDelete { + if _, exists := m.tasks[name]; !exists { + continue + } + + if err := m.store.Delete(ctx, name); err != nil { + klog.ErrorS(err, "failed to delete task from store", "name", name) + continue } + + delete(m.tasks, name) + delete(m.stopping, name) + klog.InfoS("task deleted successfully", "name", name) } } -// createTaskLocked creates a task without acquiring the lock (must be called with lock held). +// isTerminalState returns true if the task will not transition to another state. +func isTerminalState(state types.TaskState) bool { + return state == types.TaskStateSucceeded || + state == types.TaskStateFailed || + state == types.TaskStateNotFound +} diff --git a/kubernetes/internal/task-executor/manager/task_manager_test.go b/kubernetes/internal/task-executor/manager/task_manager_test.go index 3deb2629..fcb6f36c 100644 --- a/kubernetes/internal/task-executor/manager/task_manager_test.go +++ b/kubernetes/internal/task-executor/manager/task_manager_test.go @@ -16,9 +16,12 @@ package manager import ( "context" + "os/exec" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/config" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/runtime" store "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/storage" @@ -488,3 +491,208 @@ func TestTaskManager_SyncNil(t *testing.T) { t.Error("Sync() should fail for nil desired list") } } + +func TestTaskManager_AsyncStopOnDelete(t *testing.T) { + mgr, _ := setupTestManager(t) + mgr.Start(context.Background()) + defer mgr.Stop() + + ctx := context.Background() + + timeoutSec := int64(30) + task := &types.Task{ + Name: "long-running-task", + Process: &api.Process{ + Command: []string{"sleep", "30"}, + TimeoutSeconds: &timeoutSec, + }, + } + + // Create task + created, err := mgr.Create(ctx, task) + if err != nil { + t.Fatalf("Create() failed: %v", err) + } + defer cleanupTask(t, mgr, task.Name) + + // Verify task is running + assert.Equal(t, types.TaskStateRunning, created.Status.State) + + // Record the time before delete + beforeDelete := time.Now() + + // Delete task (should trigger async stop) + err = mgr.Delete(ctx, task.Name) + if err != nil { + t.Fatalf("Delete() failed: %v", err) + } + + // Verify DeletionTimestamp is set immediately (soft delete) + got, err := mgr.Get(ctx, task.Name) + if err != nil { + t.Fatalf("Get() after Delete failed: %v", err) + } + if got.DeletionTimestamp == nil { + t.Error("DeletionTimestamp should be set immediately after Delete()") + } + + // Verify Delete returned quickly (not blocked by Stop) + deleteDuration := time.Since(beforeDelete) + if deleteDuration > 500*time.Millisecond { + t.Errorf("Delete() took too long (%v), should be fast (async stop)", deleteDuration) + } + + // Wait for task to be finalized + deadline := time.Now().Add(15 * time.Second) + for time.Now().Before(deadline) { + _, err := mgr.Get(ctx, task.Name) + if err != nil { + // Task is gone, success + return + } + time.Sleep(100 * time.Millisecond) + } + t.Error("Task was not finalized within timeout after async stop") +} + +func TestTaskManager_TimeoutHandling(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found, skipping timeout test") + } + + mgr, _ := setupTestManager(t) + mgr.Start(context.Background()) + defer mgr.Stop() + + ctx := context.Background() + + // Create task with short timeout + timeoutSec := int64(2) + task := &types.Task{ + Name: "timeout-task", + Process: &api.Process{ + Command: []string{"sleep", "30"}, + TimeoutSeconds: &timeoutSec, + }, + } + + _, err := mgr.Create(ctx, task) + if err != nil { + t.Fatalf("Create() failed: %v", err) + } + defer cleanupTask(t, mgr, task.Name) + + // Wait for timeout to be detected and async stop triggered + time.Sleep(3 * time.Second) + + // Check task status - should be Timeout or Failed (after stop) + got, err := mgr.Get(ctx, task.Name) + if err != nil { + t.Fatalf("Get() failed: %v", err) + } + + // State should be Timeout (during stop) or Failed (after stop completes) + if got.Status.State != types.TaskStateTimeout && got.Status.State != types.TaskStateFailed { + t.Errorf("Expected Timeout or Failed state, got: %s", got.Status.State) + } + + // If in Timeout state, verify reason + if got.Status.State == types.TaskStateTimeout { + assert.NotEmpty(t, got.Status.SubStatuses) + assert.Equal(t, "TaskTimeout", got.Status.SubStatuses[0].Reason) + } + + // Wait for final state + deadline := time.Now().Add(15 * time.Second) + for time.Now().Before(deadline) { + got, err := mgr.Get(ctx, task.Name) + if err != nil { + // Task was deleted, that's also acceptable + return + } + if got.Status.State == types.TaskStateFailed { + // Stop completed + return + } + time.Sleep(200 * time.Millisecond) + } +} + +func TestTaskManager_CountActiveTasks(t *testing.T) { + mgr, _ := setupTestManager(t) + mgr.Start(context.Background()) + defer mgr.Stop() + ctx := context.Background() + + // Initially empty + activeCount := mgr.(*taskManager).countActiveTasks() + if activeCount != 0 { + t.Errorf("Initial active count = %d, want 0", activeCount) + } + + // Create a short-lived task that will complete quickly + task1 := &types.Task{ + Name: "quick-task-1", + Process: &api.Process{ + Command: []string{"echo", "done"}, + }, + } + _, err := mgr.Create(ctx, task1) + if err != nil { + t.Fatalf("Create() failed: %v", err) + } + defer mgr.Delete(ctx, task1.Name) + + // Wait for task1 to complete + time.Sleep(500 * time.Millisecond) + + // Should have 0 active tasks after task1 completes + activeCount = mgr.(*taskManager).countActiveTasks() + if activeCount != 0 { + t.Errorf("Active count after task1 completion = %d, want 0", activeCount) + } + + // Create a running task + task2 := &types.Task{ + Name: "active-task-2", + Process: &api.Process{ + Command: []string{"sleep", "5"}, + }, + } + _, err = mgr.Create(ctx, task2) + if err != nil { + t.Fatalf("Create() failed: %v", err) + } + defer mgr.Delete(ctx, task2.Name) + + // Should have 1 active task + activeCount = mgr.(*taskManager).countActiveTasks() + if activeCount != 1 { + t.Errorf("Active count after create = %d, want 1", activeCount) + } +} + +func TestIsTerminalState(t *testing.T) { + tests := []struct { + name string + state types.TaskState + expected bool + }{ + {"Succeeded is terminal", types.TaskStateSucceeded, true}, + {"Failed is terminal", types.TaskStateFailed, true}, + {"NotFound is terminal", types.TaskStateNotFound, true}, + {"Pending is not terminal", types.TaskStatePending, false}, + {"Running is not terminal", types.TaskStateRunning, false}, + {"Unknown is not terminal", types.TaskStateUnknown, false}, + {"Timeout is not terminal", types.TaskStateTimeout, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isTerminalState(tt.state) + if got != tt.expected { + t.Errorf("isTerminalState(%v) = %v, want %v", tt.state, got, tt.expected) + } + }) + } +} diff --git a/kubernetes/internal/task-executor/runtime/composite.go b/kubernetes/internal/task-executor/runtime/composite.go index 64156609..31c6349e 100644 --- a/kubernetes/internal/task-executor/runtime/composite.go +++ b/kubernetes/internal/task-executor/runtime/composite.go @@ -36,14 +36,10 @@ func NewExecutor(cfg *config.Config) (Executor, error) { } klog.InfoS("process executor initialized.", "enableSidecar", cfg.EnableSidecarMode, "mainContainer", cfg.MainContainerName) - // 2. Initialize ContainerExecutor (Optional) - var containerExec Executor - if cfg.EnableContainerMode { - klog.InfoS("container executor initialized", "criSocket", cfg.CRISocket) - containerExec, err = newContainerExecutor(cfg) - if err != nil { - return nil, fmt.Errorf("failed to create container executor: %w", err) - } + // 2. Initialize ContainerExecutor + containerExec, err := newContainerExecutor(cfg) + if err != nil { + return nil, fmt.Errorf("failed to create container executor: %w", err) } // 3. Return Composite return &compositeExecutor{ diff --git a/kubernetes/internal/task-executor/runtime/process.go b/kubernetes/internal/task-executor/runtime/process.go index 63b6e091..340bc781 100644 --- a/kubernetes/internal/task-executor/runtime/process.go +++ b/kubernetes/internal/task-executor/runtime/process.go @@ -240,31 +240,33 @@ func (e *processExecutor) Inspect(ctx context.Context, task *types.Task) (*types status := &types.Status{ State: types.TaskStateUnknown, } + // Prepare a single sub-status for the process + subStatus := types.SubStatus{} var pid int - // 1. Check Exit File (Completed) if exitData, err := os.ReadFile(exitPath); err == nil { fileInfo, _ := os.Stat(exitPath) exitCode, _ := strconv.Atoi(string(exitData)) - status.ExitCode = exitCode + subStatus.ExitCode = exitCode finishedAt := fileInfo.ModTime() - status.FinishedAt = &finishedAt + subStatus.FinishedAt = &finishedAt if exitCode == 0 { status.State = types.TaskStateSucceeded - status.Reason = "Succeeded" + subStatus.Reason = "Succeeded" } else { status.State = types.TaskStateFailed - status.Reason = "Failed" + subStatus.Reason = "Failed" } // Try to read start time from PID file if pidFileInfo, err := os.Stat(pidPath); err == nil { startedAt := pidFileInfo.ModTime() - status.StartedAt = &startedAt + subStatus.StartedAt = &startedAt } + status.SubStatuses = []types.SubStatus{subStatus} return status, nil } @@ -273,25 +275,37 @@ func (e *processExecutor) Inspect(ctx context.Context, task *types.Task) (*types pid, _ = strconv.Atoi(strings.TrimSpace(string(pidData))) fileInfo, _ := os.Stat(pidPath) startedAt := fileInfo.ModTime() - status.StartedAt = &startedAt + subStatus.StartedAt = &startedAt if isProcessRunning(pid) { status.State = types.TaskStateRunning + if task.Process != nil && task.Process.TimeoutSeconds != nil { + timeout := time.Duration(*task.Process.TimeoutSeconds) * time.Second + elapsed := time.Since(startedAt) + if elapsed > timeout { + status.State = types.TaskStateTimeout + subStatus.Reason = "TaskTimeout" + subStatus.Message = fmt.Sprintf("Task exceeded timeout of %d seconds", *task.Process.TimeoutSeconds) + } + } } else { // Process crashed status.State = types.TaskStateFailed - status.ExitCode = 137 // Assume kill/crash - status.Reason = "ProcessCrashed" - status.Message = "Process exited without writing exit code" + subStatus.ExitCode = 137 // Assume kill/crash + subStatus.Reason = "ProcessCrashed" + subStatus.Message = "Process exited without writing exit code" // Use ModTime as FinishedAt for crash approximation - status.FinishedAt = &startedAt + subStatus.FinishedAt = &startedAt } + status.SubStatuses = []types.SubStatus{subStatus} return status, nil } // 3. Pending status.State = types.TaskStatePending - status.Reason = "Pending" + subStatus.Reason = "Pending" + status.SubStatuses = []types.SubStatus{subStatus} + return status, nil } diff --git a/kubernetes/internal/task-executor/runtime/process_test.go b/kubernetes/internal/task-executor/runtime/process_test.go index 4101eb5e..4040cb48 100644 --- a/kubernetes/internal/task-executor/runtime/process_test.go +++ b/kubernetes/internal/task-executor/runtime/process_test.go @@ -133,8 +133,9 @@ func TestProcessExecutor_ShortLived(t *testing.T) { if status.State != types.TaskStateSucceeded { t.Errorf("Task should be succeeded, got: %s", status.State) } - if status.ExitCode != 0 { - t.Errorf("Exit code should be 0, got %d", status.ExitCode) + assert.NotEmpty(t, status.SubStatuses) + if status.SubStatuses[0].ExitCode != 0 { + t.Errorf("Exit code should be 0, got %d", status.SubStatuses[0].ExitCode) } } @@ -169,8 +170,10 @@ func TestProcessExecutor_Failure(t *testing.T) { } if status.State != types.TaskStateFailed { t.Errorf("Task should be failed") - } else if status.ExitCode != 1 { - t.Errorf("Exit code should be 1, got %d", status.ExitCode) + } + assert.NotEmpty(t, status.SubStatuses) + if status.SubStatuses[0].ExitCode != 1 { + t.Errorf("Exit code should be 1, got %d", status.SubStatuses[0].ExitCode) } } @@ -213,9 +216,7 @@ func TestShellEscape(t *testing.T) { func TestNewExecutor(t *testing.T) { // 1. Container mode + Host Mode - cfg := &config.Config{ - EnableContainerMode: true, - } + cfg := &config.Config{} e, err := NewExecutor(cfg) if err != nil { t.Fatalf("NewExecutor(container) failed: %v", err) @@ -226,8 +227,7 @@ func TestNewExecutor(t *testing.T) { // 2. Process mode only cfg = &config.Config{ - EnableContainerMode: false, - DataDir: t.TempDir(), + DataDir: t.TempDir(), } e, err = NewExecutor(cfg) if err != nil { @@ -294,3 +294,119 @@ func TestProcessExecutor_EnvInheritance(t *testing.T) { assert.Contains(t, outputStr, expectedHostVar, "Should inherit host environment variables") assert.Contains(t, outputStr, expectedTaskVar, "Should include task-specific environment variables") } + +func TestProcessExecutor_TimeoutDetection(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found") + } + + executor, _ := setupTestExecutor(t) + pExecutor := executor.(*processExecutor) + ctx := context.Background() + + timeoutSec := int64(2) + task := &types.Task{ + Name: "timeout-task", + Process: &api.Process{ + Command: []string{"sleep", "30"}, + TimeoutSeconds: &timeoutSec, + }, + } + taskDir, err := utils.SafeJoin(pExecutor.rootDir, task.Name) + assert.Nil(t, err) + os.MkdirAll(taskDir, 0755) + + if err := executor.Start(ctx, task); err != nil { + t.Fatalf("Start failed: %v", err) + } + + // Wait for timeout to be detected (2 seconds + margin) + time.Sleep(2500 * time.Millisecond) + + status, err := executor.Inspect(ctx, task) + if err != nil { + t.Fatalf("Inspect failed: %v", err) + } + + // Should detect timeout + assert.Equal(t, types.TaskStateTimeout, status.State, "Task should be in Timeout state") + assert.NotEmpty(t, status.SubStatuses) + assert.Equal(t, "TaskTimeout", status.SubStatuses[0].Reason) + assert.Contains(t, status.SubStatuses[0].Message, "timeout of 2 seconds") + + // Cleanup + executor.Stop(ctx, task) +} + +func TestProcessExecutor_TimeoutNotExceeded(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found") + } + + executor, _ := setupTestExecutor(t) + ctx := context.Background() + + timeoutSec := int64(10) + task := &types.Task{ + Name: "quick-task", + Process: &api.Process{ + Command: []string{"echo", "done"}, + TimeoutSeconds: &timeoutSec, + }, + } + taskDir, err := utils.SafeJoin(executor.(*processExecutor).rootDir, task.Name) + assert.Nil(t, err) + os.MkdirAll(taskDir, 0755) + + if err := executor.Start(ctx, task); err != nil { + t.Fatalf("Start failed: %v", err) + } + + // Wait for process to complete + time.Sleep(200 * time.Millisecond) + + status, err := executor.Inspect(ctx, task) + if err != nil { + t.Fatalf("Inspect failed: %v", err) + } + + // Should be Succeeded, not Timeout + assert.Equal(t, types.TaskStateSucceeded, status.State, "Task should be Succeeded, not Timeout") +} + +func TestProcessExecutor_NoTimeout(t *testing.T) { + if _, err := exec.LookPath("sh"); err != nil { + t.Skip("sh not found") + } + + executor, _ := setupTestExecutor(t) + pExecutor := executor.(*processExecutor) + ctx := context.Background() + + // Task without timeout setting + task := &types.Task{ + Name: "no-timeout-task", + Process: &api.Process{ + Command: []string{"sleep", "1"}, + }, + } + taskDir, err := utils.SafeJoin(pExecutor.rootDir, task.Name) + assert.Nil(t, err) + os.MkdirAll(taskDir, 0755) + + if err := executor.Start(ctx, task); err != nil { + t.Fatalf("Start failed: %v", err) + } + + // Inspect immediately + status, err := executor.Inspect(ctx, task) + if err != nil { + t.Fatalf("Inspect failed: %v", err) + } + + // Should be Running, not Timeout + assert.Equal(t, types.TaskStateRunning, status.State, "Task should be Running when no timeout is set") + + // Cleanup + executor.Stop(ctx, task) +} diff --git a/kubernetes/internal/task-executor/server/handler.go b/kubernetes/internal/task-executor/server/handler.go index 2b0a42ce..7dddfe42 100644 --- a/kubernetes/internal/task-executor/server/handler.go +++ b/kubernetes/internal/task-executor/server/handler.go @@ -18,7 +18,9 @@ import ( "encoding/json" "fmt" "net/http" + "time" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" @@ -247,8 +249,9 @@ func (h *Handler) convertAPIToInternalTask(apiTask *api.Task) *types.Task { return nil } task := &types.Task{ - Name: apiTask.Name, - Process: apiTask.Process, + Name: apiTask.Name, + Process: apiTask.Process, + PodTemplateSpec: apiTask.PodTemplateSpec, } // Initialize default status task.Status = types.Status{ @@ -265,48 +268,142 @@ func convertInternalToAPITask(task *types.Task) *api.Task { } apiTask := &api.Task{ - Name: task.Name, - Process: task.Process, + Name: task.Name, + Process: task.Process, + PodTemplateSpec: task.PodTemplateSpec, } - // Map internal Status to api.ProcessStatus - apiStatus := &api.ProcessStatus{} - - switch task.Status.State { - case types.TaskStatePending: - apiStatus.Waiting = &api.Waiting{ - Reason: task.Status.Reason, - } - case types.TaskStateRunning: - if task.Status.StartedAt != nil { - t := metav1.NewTime(*task.Status.StartedAt) + // 1. Process Status Conversion + if task.Process != nil && len(task.Status.SubStatuses) > 0 { + sub := task.Status.SubStatuses[0] + apiStatus := &api.ProcessStatus{} + + // Handle Timeout state - map to Terminated with exitCode 137 + if task.Status.State == types.TaskStateTimeout { + term := &api.Terminated{ + ExitCode: 137, + Reason: sub.Reason, // "TaskTimeout" + Message: sub.Message, // "Task exceeded timeout of X seconds" + } + if sub.StartedAt != nil { + term.StartedAt = metav1.NewTime(*sub.StartedAt) + } + term.FinishedAt = metav1.Now() + apiStatus.Terminated = term + } else if sub.FinishedAt != nil { + // Terminated + term := &api.Terminated{ + ExitCode: int32(sub.ExitCode), + Reason: sub.Reason, + Message: sub.Message, + } + term.FinishedAt = metav1.NewTime(*sub.FinishedAt) + if sub.StartedAt != nil { + term.StartedAt = metav1.NewTime(*sub.StartedAt) + } + apiStatus.Terminated = term + } else if sub.StartedAt != nil { + // Running apiStatus.Running = &api.Running{ - StartedAt: t, + StartedAt: metav1.NewTime(*sub.StartedAt), } } else { - apiStatus.Running = &api.Running{} + // Waiting + apiStatus.Waiting = &api.Waiting{ + Reason: sub.Reason, + Message: sub.Message, + } + } + apiTask.ProcessStatus = apiStatus + } + + // 2. Pod Status Conversion + if task.PodTemplateSpec != nil { + podStatus := &corev1.PodStatus{ + // Default phase mapping + Phase: corev1.PodUnknown, + } + + switch task.Status.State { + case types.TaskStatePending: + podStatus.Phase = corev1.PodPending + case types.TaskStateRunning: + podStatus.Phase = corev1.PodRunning + case types.TaskStateSucceeded: + podStatus.Phase = corev1.PodSucceeded + case types.TaskStateFailed: + podStatus.Phase = corev1.PodFailed } - case types.TaskStateSucceeded, types.TaskStateFailed: - term := &api.Terminated{ - ExitCode: int32(task.Status.ExitCode), - Reason: task.Status.Reason, - Message: task.Status.Message, + + for _, sub := range task.Status.SubStatuses { + cs := corev1.ContainerStatus{ + Name: sub.Name, + } + if sub.FinishedAt != nil { + cs.State.Terminated = &corev1.ContainerStateTerminated{ + ExitCode: int32(sub.ExitCode), + Reason: sub.Reason, + Message: sub.Message, + FinishedAt: metav1.NewTime(*sub.FinishedAt), + } + if sub.StartedAt != nil { + cs.State.Terminated.StartedAt = metav1.NewTime(*sub.StartedAt) + } + } else if sub.StartedAt != nil { + cs.State.Running = &corev1.ContainerStateRunning{ + StartedAt: metav1.NewTime(*sub.StartedAt), + } + cs.Ready = true + } else { + cs.State.Waiting = &corev1.ContainerStateWaiting{ + Reason: sub.Reason, + Message: sub.Message, + } + } + podStatus.ContainerStatuses = append(podStatus.ContainerStatuses, cs) } - if task.Status.StartedAt != nil { - t := metav1.NewTime(*task.Status.StartedAt) - term.StartedAt = t + + allReady := len(podStatus.ContainerStatuses) > 0 + for _, cs := range podStatus.ContainerStatuses { + if !cs.Ready { + allReady = false + break + } } - if task.Status.FinishedAt != nil { - t := metav1.NewTime(*task.Status.FinishedAt) - term.FinishedAt = t + readyStatus := corev1.ConditionFalse + if allReady { + readyStatus = corev1.ConditionTrue } - apiStatus.Terminated = term - default: - apiStatus.Waiting = &api.Waiting{ - Reason: "Unknown", + + var latestTransition time.Time + for _, sub := range task.Status.SubStatuses { + if sub.StartedAt != nil && sub.StartedAt.After(latestTransition) { + latestTransition = *sub.StartedAt + } + if sub.FinishedAt != nil && sub.FinishedAt.After(latestTransition) { + latestTransition = *sub.FinishedAt + } } + ltt := metav1.NewTime(latestTransition) + if latestTransition.IsZero() { + ltt = metav1.Now() + } + + podStatus.Conditions = append(podStatus.Conditions, + corev1.PodCondition{ + Type: corev1.PodReady, + Status: readyStatus, + LastTransitionTime: ltt, + }, + corev1.PodCondition{ + Type: corev1.ContainersReady, + Status: readyStatus, + LastTransitionTime: ltt, + }, + ) + + apiTask.PodStatus = podStatus } - apiTask.ProcessStatus = apiStatus return apiTask } diff --git a/kubernetes/internal/task-executor/server/handler_test.go b/kubernetes/internal/task-executor/server/handler_test.go index 68ebf8d8..3cadac5c 100644 --- a/kubernetes/internal/task-executor/server/handler_test.go +++ b/kubernetes/internal/task-executor/server/handler_test.go @@ -23,9 +23,14 @@ import ( "net/http" "net/http/httptest" "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/config" "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/task-executor/types" + "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/utils" api "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor" ) @@ -241,3 +246,184 @@ func TestHandler_Errors(t *testing.T) { t.Errorf("CreateTask should fail with 500, got %d", w.Code) } } + +func TestConvertInternalToAPITask(t *testing.T) { + now := time.Now() + + t.Run("Process Task", func(t *testing.T) { + task := &types.Task{ + Name: "proc-task", + Process: &api.Process{Command: []string{"ls"}}, + Status: types.Status{ + State: types.TaskStateSucceeded, + SubStatuses: []types.SubStatus{ + { + ExitCode: 0, + Reason: "Completed", + FinishedAt: &now, + }, + }, + }, + } + + apiTask := convertInternalToAPITask(task) + assert.NotNil(t, apiTask.ProcessStatus) + assert.NotNil(t, apiTask.ProcessStatus.Terminated) + assert.Equal(t, int32(0), apiTask.ProcessStatus.Terminated.ExitCode) + assert.Nil(t, apiTask.PodStatus) + }) + + t.Run("Pod Task - Partially Ready", func(t *testing.T) { + task := &types.Task{ + Name: "pod-task-partial", + PodTemplateSpec: &corev1.PodTemplateSpec{}, + Status: types.Status{ + State: types.TaskStateRunning, + SubStatuses: []types.SubStatus{ + { + Name: "c1", + StartedAt: &now, + }, + { + Name: "c2", + Reason: "Pending", + }, + }, + }, + } + + apiTask := convertInternalToAPITask(task) + assert.NotNil(t, apiTask.PodStatus) + assert.Equal(t, corev1.PodRunning, apiTask.PodStatus.Phase) + assert.Len(t, apiTask.PodStatus.ContainerStatuses, 2) + assert.True(t, apiTask.PodStatus.ContainerStatuses[0].Ready) + assert.False(t, apiTask.PodStatus.ContainerStatuses[1].Ready) + assert.False(t, utils.IsPodReadyConditionTrue(*apiTask.PodStatus)) + + // Conditions check + var podReady, containersReady *corev1.PodCondition + for i := range apiTask.PodStatus.Conditions { + c := &apiTask.PodStatus.Conditions[i] + if c.Type == corev1.PodReady { + podReady = c + } else if c.Type == corev1.ContainersReady { + containersReady = c + } + } + assert.NotNil(t, podReady) + assert.Equal(t, corev1.ConditionFalse, podReady.Status) + assert.NotNil(t, containersReady) + assert.Equal(t, corev1.ConditionFalse, containersReady.Status) + assert.Equal(t, now.Unix(), podReady.LastTransitionTime.Unix()) + }) + + t.Run("Pod Task - Fully Ready", func(t *testing.T) { + later := now.Add(time.Minute) + task := &types.Task{ + Name: "pod-task-ready", + PodTemplateSpec: &corev1.PodTemplateSpec{}, + Status: types.Status{ + State: types.TaskStateRunning, + SubStatuses: []types.SubStatus{ + { + Name: "c1", + StartedAt: &now, + }, + { + Name: "c2", + StartedAt: &later, + }, + }, + }, + } + + apiTask := convertInternalToAPITask(task) + assert.NotNil(t, apiTask.PodStatus) + + // Conditions check + var podReady, containersReady *corev1.PodCondition + for i := range apiTask.PodStatus.Conditions { + c := &apiTask.PodStatus.Conditions[i] + if c.Type == corev1.PodReady { + podReady = c + } else if c.Type == corev1.ContainersReady { + containersReady = c + } + } + assert.NotNil(t, podReady) + assert.Equal(t, corev1.ConditionTrue, podReady.Status) + assert.NotNil(t, containersReady) + assert.Equal(t, corev1.ConditionTrue, containersReady.Status) + // Should use the latest timestamp (later) + assert.Equal(t, later.Unix(), podReady.LastTransitionTime.Unix()) + assert.True(t, utils.IsPodReadyConditionTrue(*apiTask.PodStatus)) + }) +} + +func TestConvertInternalToAPITask_Timeout(t *testing.T) { + now := time.Now() + timeoutSec := int64(60) + + t.Run("Process Task Timeout", func(t *testing.T) { + task := &types.Task{ + Name: "timeout-task", + Process: &api.Process{ + Command: []string{"sleep", "100"}, + TimeoutSeconds: &timeoutSec, + }, + Status: types.Status{ + State: types.TaskStateTimeout, + SubStatuses: []types.SubStatus{ + { + Reason: "TaskTimeout", + Message: "Task exceeded timeout of 60 seconds", + StartedAt: &now, + FinishedAt: nil, // Not finished yet + }, + }, + }, + } + + apiTask := convertInternalToAPITask(task) + + // Should map to Terminated with exit code 137 + assert.NotNil(t, apiTask.ProcessStatus) + assert.NotNil(t, apiTask.ProcessStatus.Terminated) + assert.Nil(t, apiTask.ProcessStatus.Running) + assert.Nil(t, apiTask.ProcessStatus.Waiting) + assert.Equal(t, int32(137), apiTask.ProcessStatus.Terminated.ExitCode) + assert.Equal(t, "TaskTimeout", apiTask.ProcessStatus.Terminated.Reason) + assert.Equal(t, "Task exceeded timeout of 60 seconds", apiTask.ProcessStatus.Terminated.Message) + assert.Equal(t, now.Unix(), apiTask.ProcessStatus.Terminated.StartedAt.Unix()) + // FinishedAt should be set to "now" for timeout + assert.False(t, apiTask.ProcessStatus.Terminated.FinishedAt.IsZero()) + assert.Nil(t, apiTask.PodStatus) + }) + + t.Run("Timeout After Completion", func(t *testing.T) { + later := now.Add(2 * time.Minute) + task := &types.Task{ + Name: "completed-task", + Process: &api.Process{Command: []string{"ls"}}, + Status: types.Status{ + State: types.TaskStateFailed, // After stop, it becomes Failed + SubStatuses: []types.SubStatus{ + { + ExitCode: 137, + Reason: "Killed", + StartedAt: &now, + FinishedAt: &later, + }, + }, + }, + } + + apiTask := convertInternalToAPITask(task) + + // Should be Terminated with actual exit code + assert.NotNil(t, apiTask.ProcessStatus.Terminated) + assert.Equal(t, int32(137), apiTask.ProcessStatus.Terminated.ExitCode) + assert.Equal(t, now.Unix(), apiTask.ProcessStatus.Terminated.StartedAt.Unix()) + assert.Equal(t, later.Unix(), apiTask.ProcessStatus.Terminated.FinishedAt.Unix()) + }) +} diff --git a/kubernetes/internal/task-executor/types/task.go b/kubernetes/internal/task-executor/types/task.go index cd139f6f..a4bb1ad4 100644 --- a/kubernetes/internal/task-executor/types/task.go +++ b/kubernetes/internal/task-executor/types/task.go @@ -31,12 +31,19 @@ const ( TaskStateSucceeded TaskState = "Succeeded" TaskStateFailed TaskState = "Failed" TaskStateUnknown TaskState = "Unknown" + TaskStateNotFound TaskState = "NotFound" + TaskStateTimeout TaskState = "Timeout" ) // Status represents the internal status of a task. // This is decoupled from the Kubernetes API status. type Status struct { - State TaskState `json:"state"` + State TaskState `json:"state"` + SubStatuses []SubStatus `json:"subStatuses,omitempty"` +} + +type SubStatus struct { + Name string `json:"name,omitempty"` // for process it's empty, for PodTemplateSpec is container name Reason string `json:"reason,omitempty"` Message string `json:"message,omitempty"` ExitCode int `json:"exitCode,omitempty"` diff --git a/kubernetes/pkg/task-executor/types.go b/kubernetes/pkg/task-executor/types.go index 3631d22c..5a3ba5ab 100644 --- a/kubernetes/pkg/task-executor/types.go +++ b/kubernetes/pkg/task-executor/types.go @@ -41,6 +41,8 @@ type Process struct { Env []corev1.EnvVar `json:"env,omitempty"` // WorkingDir process working directory. WorkingDir string `json:"workingDir,omitempty"` + // TimeoutSeconds process timeout seconds. + TimeoutSeconds *int64 `json:"timeoutSeconds,omitempty"` } // ProcessStatus holds a possible state of process. diff --git a/kubernetes/test/e2e_task/task_e2e_test.go b/kubernetes/test/e2e_task/task_e2e_test.go index 5843f20b..d2eea840 100644 --- a/kubernetes/test/e2e_task/task_e2e_test.go +++ b/kubernetes/test/e2e_task/task_e2e_test.go @@ -193,4 +193,119 @@ var _ = Describe("Task Executor E2E", Ordered, func() { }, 5*time.Second, 500*time.Millisecond).Should(BeNil()) }) }) + + Context("When creating a task with timeout", func() { + taskName := "e2e-timeout-test" + + It("should timeout and be terminated", func() { + By("Creating task with 5 second timeout that runs for 30 seconds") + timeoutSec := int64(5) + task := &api.Task{ + Name: taskName, + Process: &api.Process{ + Command: []string{"sleep", "30"}, + TimeoutSeconds: &timeoutSec, + }, + } + _, err := client.Set(context.Background(), task) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting for task to be terminated (within 15 seconds)") + // After timeout detection, Stop is called and the process is killed. + // Once Stop completes, the exit file is written and state becomes Failed. + Eventually(func(g Gomega) { + got, err := client.Get(context.Background()) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(got).NotTo(BeNil()) + g.Expect(got.Name).To(Equal(taskName)) + + // Should be Terminated with exit code 137 (SIGKILL) or 143 (SIGTERM) + // sleep responds to SIGTERM quickly, so we usually get 143 + // The state will be "Failed" after exit file is written + if got.ProcessStatus != nil && got.ProcessStatus.Terminated != nil { + g.Expect(got.ProcessStatus.Terminated.ExitCode).To(SatisfyAny( + Equal(int32(137)), // SIGKILL + Equal(int32(143)), // SIGTERM + )) + } else { + // Fail if not terminated yet + g.Expect(got.ProcessStatus).NotTo(BeNil(), "Task ProcessStatus is nil") + g.Expect(got.ProcessStatus.Terminated).NotTo(BeNil(), "Task status: %v", got.ProcessStatus) + } + }, 15*time.Second, 1*time.Second).Should(Succeed()) + + By("Verifying the task was terminated") + got, err := client.Get(context.Background()) + Expect(err).NotTo(HaveOccurred()) + Expect(got.ProcessStatus.Terminated).NotTo(BeNil()) + Expect(got.ProcessStatus.Terminated.ExitCode).To(SatisfyAny( + Equal(int32(137)), // SIGKILL + Equal(int32(143)), // SIGTERM + )) + // State could be "Failed" (after exit file written) or "Timeout" (during stop) + Expect(got.ProcessStatus.Terminated.Reason).To(SatisfyAny( + Equal("Failed"), + Equal("TaskTimeout"), + )) + }) + + It("should be deletable after timeout", func() { + By("Deleting task") + _, err := client.Set(context.Background(), nil) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying deletion") + Eventually(func() *api.Task { + got, _ := client.Get(context.Background()) + return got + }, 5*time.Second, 500*time.Millisecond).Should(BeNil()) + }) + }) + + Context("When creating a task that completes before timeout", func() { + taskName := "e2e-no-timeout-test" + + It("should succeed without timeout", func() { + By("Creating task with 60 second timeout that completes in 2 seconds") + timeoutSec := int64(60) + task := &api.Task{ + Name: taskName, + Process: &api.Process{ + Command: []string{"sleep", "2"}, + TimeoutSeconds: &timeoutSec, + }, + } + _, err := client.Set(context.Background(), task) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting for task to succeed") + Eventually(func(g Gomega) { + got, err := client.Get(context.Background()) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(got).NotTo(BeNil()) + g.Expect(got.Name).To(Equal(taskName)) + + // Should succeed with exit code 0 + if got.ProcessStatus != nil && got.ProcessStatus.Terminated != nil { + g.Expect(got.ProcessStatus.Terminated.ExitCode).To(BeZero()) + g.Expect(got.ProcessStatus.Terminated.Reason).To(Equal("Succeeded")) + } else { + g.Expect(got.ProcessStatus).NotTo(BeNil(), "Task ProcessStatus is nil") + g.Expect(got.ProcessStatus.Terminated).NotTo(BeNil(), "Task status: %v", got.ProcessStatus) + } + }, 10*time.Second, 1*time.Second).Should(Succeed()) + }) + + It("should be deletable", func() { + By("Deleting task") + _, err := client.Set(context.Background(), nil) + Expect(err).NotTo(HaveOccurred()) + + By("Verifying deletion") + Eventually(func() *api.Task { + got, _ := client.Get(context.Background()) + return got + }, 5*time.Second, 500*time.Millisecond).Should(BeNil()) + }) + }) })