Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions kubernetes/cmd/task-executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ func main() {
klog.ErrorS(err, "failed to create executor")
os.Exit(1)
}
mode := "process"
if cfg.EnableContainerMode {
mode = "container"
}
klog.InfoS("executor initialized", "mode", mode)

// Initialize TaskManager
taskManager, err := manager.NewTaskManager(cfg, taskStore, exec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,19 @@ func (s *DefaultTaskSchedulingStrategy) getTaskSpec(batchSbx *sandboxv1alpha1.Ba
return nil, fmt.Errorf("batchsandbox: failed to unmarshal %s to TaskTemplateSpec, idx %d, err %w", modified, idx, err)
}
task.Process = &api.Process{
Command: newTaskTemplate.Spec.Process.Command,
Args: newTaskTemplate.Spec.Process.Args,
Env: newTaskTemplate.Spec.Process.Env,
WorkingDir: newTaskTemplate.Spec.Process.WorkingDir,
Command: newTaskTemplate.Spec.Process.Command,
Args: newTaskTemplate.Spec.Process.Args,
Env: newTaskTemplate.Spec.Process.Env,
WorkingDir: newTaskTemplate.Spec.Process.WorkingDir,
TimeoutSeconds: batchSbx.Spec.TaskTemplate.Spec.TimeoutSeconds,
}
} else if batchSbx.Spec.TaskTemplate != nil && batchSbx.Spec.TaskTemplate.Spec.Process != nil {
task.Process = &api.Process{
Command: batchSbx.Spec.TaskTemplate.Spec.Process.Command,
Args: batchSbx.Spec.TaskTemplate.Spec.Process.Args,
Env: batchSbx.Spec.TaskTemplate.Spec.Process.Env,
WorkingDir: batchSbx.Spec.TaskTemplate.Spec.Process.WorkingDir,
Command: batchSbx.Spec.TaskTemplate.Spec.Process.Command,
Args: batchSbx.Spec.TaskTemplate.Spec.Process.Args,
Env: batchSbx.Spec.TaskTemplate.Spec.Process.Env,
WorkingDir: batchSbx.Spec.TaskTemplate.Spec.Process.WorkingDir,
TimeoutSeconds: batchSbx.Spec.TaskTemplate.Spec.TimeoutSeconds,
}
}
return task, nil
Expand Down
38 changes: 16 additions & 22 deletions kubernetes/internal/task-executor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,26 @@ import (
)

type Config struct {
DataDir string
ListenAddr string
CRISocket string
ReadTimeout time.Duration
WriteTimeout time.Duration
ReconcileInterval time.Duration
EnableSidecarMode bool
EnableContainerMode bool
MainContainerName string
DataDir string
ListenAddr string
CRISocket string
ReadTimeout time.Duration
WriteTimeout time.Duration
ReconcileInterval time.Duration
EnableSidecarMode bool
MainContainerName string
}

func NewConfig() *Config {
return &Config{
DataDir: "/var/lib/sandbox/tasks",
ListenAddr: "0.0.0.0:5758",
CRISocket: "/var/run/containerd/containerd.sock",
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
ReconcileInterval: 500 * time.Millisecond,
EnableContainerMode: false,
EnableSidecarMode: false,
MainContainerName: "main",
DataDir: "/var/lib/sandbox/tasks",
ListenAddr: "0.0.0.0:5758",
CRISocket: "/var/run/containerd/containerd.sock",
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
ReconcileInterval: 500 * time.Millisecond,
EnableSidecarMode: false,
MainContainerName: "main",
}
}

Expand All @@ -56,9 +54,6 @@ func (c *Config) LoadFromEnv() {
if v := os.Getenv("CRI_SOCKET"); v != "" {
c.CRISocket = v
}
if v := os.Getenv("ENABLE_CONTAINER_MODE"); v == "true" {
c.EnableContainerMode = true
}
if v := os.Getenv("ENABLE_SIDECAR_MODE"); v == "true" {
c.EnableSidecarMode = true
}
Expand All @@ -71,7 +66,6 @@ func (c *Config) LoadFromFlags() {
flag.StringVar(&c.DataDir, "data-dir", c.DataDir, "data storage directory")
flag.StringVar(&c.ListenAddr, "listen-addr", c.ListenAddr, "service listen address")
flag.StringVar(&c.CRISocket, "cri-socket", c.CRISocket, "CRI socket path for container runner mode")
flag.BoolVar(&c.EnableContainerMode, "enable-container-mode", c.EnableContainerMode, "enable container runner mode")
flag.BoolVar(&c.EnableSidecarMode, "enable-sidecar-mode", c.EnableSidecarMode, "enable sidecar runner mode")
flag.StringVar(&c.MainContainerName, "main-container-name", c.MainContainerName, "main container name")
flag.Parse()
Expand Down
200 changes: 97 additions & 103 deletions kubernetes/internal/task-executor/manager/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"

Expand All @@ -37,11 +38,14 @@ const (
type taskManager struct {
mu sync.RWMutex
tasks map[string]*types.Task // name -> task
// TODO we need design queue for pending tasks
activeTasks int // Count of active tasks (not deleted AND not terminated)
store store.TaskStore
executor runtime.Executor
config *config.Config

store store.TaskStore
executor runtime.Executor
config *config.Config

// stopping tracks tasks that are currently being stopped.
// This prevents duplicate Stop calls and status rollback during async stop.
stopping map[string]bool

// Reconcile loop control
stopCh chan struct{}
Expand All @@ -65,6 +69,7 @@ func NewTaskManager(cfg *config.Config, taskStore store.TaskStore, exec runtime.
store: taskStore,
executor: exec,
config: cfg,
stopping: make(map[string]bool),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
}, nil
Expand All @@ -83,6 +88,18 @@ func (m *taskManager) isTaskActive(task *types.Task) bool {
return state == types.TaskStatePending || state == types.TaskStateRunning
}

// countActiveTasks counts tasks that are active (not deleted AND not terminated).
// Must be called with lock held.
func (m *taskManager) countActiveTasks() int {
count := 0
for _, task := range m.tasks {
if m.isTaskActive(task) {
count++
}
}
return count
}

// Create creates a new task and starts execution.
func (m *taskManager) Create(ctx context.Context, task *types.Task) (*types.Task, error) {
if task == nil {
Expand All @@ -100,8 +117,8 @@ func (m *taskManager) Create(ctx context.Context, task *types.Task) (*types.Task
return nil, fmt.Errorf("task %s already exists", task.Name)
}

// Enforce single task limitation using the cached counter
if m.activeTasks >= maxConcurrentTasks {
// Enforce single task limitation using real-time count
if m.countActiveTasks() >= maxConcurrentTasks {
return nil, fmt.Errorf("maximum concurrent tasks (%d) reached, cannot create new task", maxConcurrentTasks)
}

Expand Down Expand Up @@ -133,14 +150,10 @@ func (m *taskManager) Create(ctx context.Context, task *types.Task) (*types.Task
// Safety fallback: Ensure task has a state
if task.Status.State == "" {
task.Status.State = types.TaskStatePending
task.Status.Reason = "Initialized"
}

// Add to memory
m.tasks[task.Name] = task
if m.isTaskActive(task) {
m.activeTasks++
}

klog.InfoS("task created successfully", "name", task.Name)
return task, nil
Expand Down Expand Up @@ -244,11 +257,6 @@ func (m *taskManager) softDeleteLocked(ctx context.Context, task *types.Task) er
return nil // Already marked
}

// If the task was active, decrement the active count
if m.isTaskActive(task) {
m.activeTasks--
}

now := time.Now()
task.DeletionTimestamp = &now

Expand Down Expand Up @@ -294,8 +302,8 @@ func (m *taskManager) createTaskLocked(ctx context.Context, task *types.Task) er
return fmt.Errorf("task %s already exists", task.Name)
}

// Enforce single task limitation using the cached counter
if m.activeTasks >= maxConcurrentTasks {
// Enforce single task limitation using real-time count
if m.countActiveTasks() >= maxConcurrentTasks {
return fmt.Errorf("maximum concurrent tasks (%d) reached, cannot create new task", maxConcurrentTasks)
}

Expand Down Expand Up @@ -324,36 +332,6 @@ func (m *taskManager) createTaskLocked(ctx context.Context, task *types.Task) er

// Add to memory
m.tasks[task.Name] = task
if m.isTaskActive(task) {
m.activeTasks++
}
return nil
}

// deleteTaskLocked deletes a task without acquiring the lock (must be called with lock held).
func (m *taskManager) deleteTaskLocked(ctx context.Context, name string) error {
task, exists := m.tasks[name]
if !exists {
// Already deleted, no error
klog.InfoS("task not found, skipping delete", "name", name)
return nil
}

// Stop task execution
if err := m.executor.Stop(ctx, task); err != nil {
klog.ErrorS(err, "failed to stop task", "name", name)
// Continue with deletion even if stop fails
}

// Delete from store
if err := m.store.Delete(ctx, name); err != nil {
return fmt.Errorf("failed to delete task from store: %w", err)
}

// Remove from memory
delete(m.tasks, name)

klog.InfoS("task deleted successfully", "name", name)
return nil
}

Expand Down Expand Up @@ -398,11 +376,6 @@ func (m *taskManager) recoverTasks(ctx context.Context) error {
// Add to memory
m.tasks[task.Name] = task

// Update active count
if m.isTaskActive(task) {
m.activeTasks++
}

klog.InfoS("recovered task", "name", task.Name, "state", task.Status.State, "deleting", task.DeletionTimestamp != nil)
}

Expand Down Expand Up @@ -432,71 +405,92 @@ func (m *taskManager) reconcileLoop(ctx context.Context) {

// reconcileTasks updates the status of all tasks and handles deletion.
func (m *taskManager) reconcileTasks(ctx context.Context) {
m.mu.RLock()
tasks := make([]*types.Task, 0, len(m.tasks))
for _, task := range m.tasks {
if task != nil {
tasks = append(tasks, task)
}
}
m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()

// Update each task's status
for _, task := range tasks {
var tasksToDelete []string

for name, task := range m.tasks {
if task == nil {
continue
}
status, err := m.executor.Inspect(ctx, task)
if err != nil {
klog.ErrorS(err, "failed to inspect task", "name", task.Name)
klog.ErrorS(err, "failed to inspect task", "name", name)
continue
}
state := status.State

// Acquire lock to safely update status and active count
m.mu.Lock()
wasActive := m.isTaskActive(task)
// Determine if we should stop the task
shouldStop := false
stopReason := ""

// Update status
task.Status = *status
if task.DeletionTimestamp != nil && !m.stopping[name] {
if !isTerminalState(state) {
shouldStop = true
stopReason = "deletion requested"
}
} else if state == types.TaskStateTimeout && !m.stopping[name] {
shouldStop = true
stopReason = "timeout exceeded"
}

isActive := m.isTaskActive(task)
if shouldStop {
klog.InfoS("stopping task", "name", name, "reason", stopReason)
m.stopping[name] = true

// If task transitioned from Active -> Inactive (Terminated), decrement active count
if wasActive && !isActive {
m.activeTasks--
}
m.mu.Unlock()

// Handle Deletion
if task.DeletionTimestamp != nil {
if task.Status.State == types.TaskStateSucceeded || task.Status.State == types.TaskStateFailed {
// Task is fully terminated, finalize deletion (remove from store/memory)
klog.InfoS("task terminated, finalizing deletion", "name", task.Name)
m.mu.Lock()
if err := m.deleteTaskLocked(ctx, task.Name); err != nil {
klog.ErrorS(err, "failed to finalize task deletion", "name", task.Name)
// Async stop to avoid blocking the reconcile loop
go func(t *types.Task, taskName string) {
defer func() {
m.mu.Lock()
delete(m.stopping, taskName)
m.mu.Unlock()
}()

if err := m.executor.Stop(ctx, t); err != nil {
klog.ErrorS(err, "failed to stop task", "name", taskName)
}
m.mu.Unlock()
continue
} else {
// Task is still running, trigger Stop
klog.InfoS("stopping task marked for deletion", "name", task.Name)
if err := m.executor.Stop(ctx, task); err != nil {
klog.ErrorS(err, "failed to stop task", "name", task.Name)
klog.InfoS("task stopped", "name", taskName)
}(task, name)
}

// Determine if we can finalize deletion
if task.DeletionTimestamp != nil && isTerminalState(state) {
klog.InfoS("task terminated, finalizing deletion", "name", name)
tasksToDelete = append(tasksToDelete, name)
}

// Update status only if not stopping (prevent status rollback during async stop)
if !m.stopping[name] {
if !reflect.DeepEqual(task.Status, *status) {
task.Status = *status
if err := m.store.Update(ctx, task); err != nil {
klog.ErrorS(err, "failed to update task status in store", "name", name)
}
}
}
}

// Update task status in memory only.
// We do not need to persist to store here because Persistent fields (Spec, PID, etc.) do not change during the reconcile loop.
// The Status struct IS persisted, but we choose not to persist every few seconds if only runtime state changes.
// However, since we made Status a first-class citizen and it's small, we COULD persist it.
// But for performance, we stick to the decision: only persist on significant changes (Create/Delete).
// Note: If we want to persist ExitCode/FinishedAt, we might need to Update store when state changes to Terminated.
// Let's add that optimization: if state changed to Terminated, persist it.
if wasActive && !isActive {
if err := m.store.Update(ctx, task); err != nil {
klog.ErrorS(err, "failed to update task status in store", "name", task.Name)
}
// Finalize deletions
for _, name := range tasksToDelete {
if _, exists := m.tasks[name]; !exists {
continue
}

if err := m.store.Delete(ctx, name); err != nil {
klog.ErrorS(err, "failed to delete task from store", "name", name)
continue
}

delete(m.tasks, name)
delete(m.stopping, name)
klog.InfoS("task deleted successfully", "name", name)
}
}

// createTaskLocked creates a task without acquiring the lock (must be called with lock held).
// isTerminalState returns true if the task will not transition to another state.
func isTerminalState(state types.TaskState) bool {
return state == types.TaskStateSucceeded ||
state == types.TaskStateFailed ||
state == types.TaskStateNotFound
}
Loading
Loading