Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 44 additions & 51 deletions internal/nodes/node_drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@ import (
"github.com/castai/cluster-controller/internal/waitext"
)

type EvictRequest struct {
Node string
CastNamespace string
SkipDeletedTimeoutSeconds int
}

type DrainRequest struct {
Node string
CastNamespace string
SkipDeletedTimeoutSeconds int
DeleteOptions meta.DeleteOptions
}

type Drainer interface {
Evict(ctx context.Context, data EvictRequest) ([]*core.Pod, error)
Drain(ctx context.Context, data DrainRequest) ([]*core.Pod, error)
Evict(ctx context.Context, data DrainRequest) ([]*core.Pod, error)
Drain(ctx context.Context, data DrainRequest, opts meta.DeleteOptions) ([]*core.Pod, error)
}

type DrainerConfig struct {
Expand All @@ -49,6 +42,11 @@ type drainer struct {
log logrus.FieldLogger
}

type (
drainFn func(ctx context.Context, toEvict []*core.Pod) ([]*core.Pod, []*core.Pod, error)
deleteFn func(ctx context.Context, pod core.Pod) error
)

func NewDrainer(
pods informer.PodInformer,
client *k8s.Client,
Expand All @@ -64,50 +62,61 @@ func NewDrainer(
return m
}

func (d *drainer) Drain(ctx context.Context, data DrainRequest) ([]*core.Pod, error) {
func (d *drainer) Drain(ctx context.Context, data DrainRequest, opts meta.DeleteOptions) ([]*core.Pod, error) {
logger := logger.FromContext(ctx, d.log)

logger.Info("starting drain")
pods, err := d.list(ctx, data.Node)
if err != nil {
return nil, err
}

toEvict := d.prioritizePods(pods, data.CastNamespace, data.SkipDeletedTimeoutSeconds)
if len(toEvict) == 0 {
return []*core.Pod{}, nil
fn := func(ctx context.Context, toEvict []*core.Pod) ([]*core.Pod, []*core.Pod, error) {
deletePod := func(ctx context.Context, pod core.Pod) error {
return d.client.DeletePod(ctx, opts, pod, d.cfg.PodDeleteRetries, d.cfg.PodEvictRetryDelay)
}
return d.try(ctx, toEvict, "delete-pod", deletePod)
}

_, failed, err := d.tryDrain(ctx, toEvict, data.DeleteOptions)
if err != nil && !errors.Is(err, &k8s.PodFailedActionError{}) {
return nil, err
}

err = d.waitTerminaition(ctx, data.Node, failed)
failed, err := d.do(ctx, data, fn)
if err != nil {
return []*core.Pod{}, err
return nil, err
}

logger.Info("drain finished")

return failed, nil
}

func (d *drainer) tryDrain(ctx context.Context, toEvict []*core.Pod, options meta.DeleteOptions) ([]*core.Pod, []*core.Pod, error) {
deletePod := func(ctx context.Context, pod core.Pod) error {
return d.client.DeletePod(ctx, options, pod, d.cfg.PodDeleteRetries, d.cfg.PodEvictRetryDelay)
}

successful, podsWithFailedAction := d.client.ExecuteBatchPodActions(ctx, toEvict, deletePod, "delete-pod")
func (d *drainer) try(ctx context.Context, toEvict []*core.Pod, action string, remove deleteFn) ([]*core.Pod, []*core.Pod, error) {
successful, podsWithFailedAction := d.client.ExecuteBatchPodActions(ctx, toEvict, remove, action)
failed, err := d.handleFailures(ctx, "deletion", podsWithFailedAction)
return successful, failed, err
}

func (d *drainer) Evict(ctx context.Context, data EvictRequest) ([]*core.Pod, error) {
func (d *drainer) Evict(ctx context.Context, data DrainRequest) ([]*core.Pod, error) {
logger := logger.FromContext(ctx, d.log)

logger.Info("starting eviction")

fn := func(ctx context.Context, toEvict []*core.Pod) ([]*core.Pod, []*core.Pod, error) {
groupVersion, err := drain.CheckEvictionSupport(d.client.Clientset())
if err != nil {
return nil, nil, err
}
evictPod := func(ctx context.Context, pod core.Pod) error {
return d.client.EvictPod(ctx, pod, d.cfg.PodEvictRetryDelay, groupVersion)
}
return d.try(ctx, toEvict, "evict-pod", evictPod)
}

failed, err := d.do(ctx, data, fn)
if err != nil {
return failed, err
}

logger.Info("eviction finished")

return failed, nil
}

func (d *drainer) do(ctx context.Context, data DrainRequest, drain drainFn) ([]*core.Pod, error) {
pods, err := d.list(ctx, data.Node)
if err != nil {
return nil, err
Expand All @@ -118,33 +127,17 @@ func (d *drainer) Evict(ctx context.Context, data EvictRequest) ([]*core.Pod, er
return []*core.Pod{}, nil
}

_, ignored, err := d.tryEvict(ctx, toEvict)
_, failed, err := drain(ctx, toEvict)
if err != nil && !errors.Is(err, &k8s.PodFailedActionError{}) {
return nil, err
}

err = d.waitTerminaition(ctx, data.Node, ignored)
if err != nil {
return []*core.Pod{}, err
}

logger.Info("eviction finished")

return ignored, nil
}

func (d *drainer) tryEvict(ctx context.Context, toEvict []*core.Pod) ([]*core.Pod, []*core.Pod, error) {
groupVersion, err := drain.CheckEvictionSupport(d.client.Clientset())
err = d.waitTerminaition(ctx, data.Node, failed)
if err != nil {
return nil, nil, err
}
evictPod := func(ctx context.Context, pod core.Pod) error {
return d.client.EvictPod(ctx, pod, d.cfg.PodEvictRetryDelay, groupVersion)
return nil, err
}

successful, podsWithFailedAction := d.client.ExecuteBatchPodActions(ctx, toEvict, evictPod, "evict-pod")
failed, err := d.handleFailures(ctx, "eviction", podsWithFailedAction)
return successful, failed, err
return failed, nil
}

func (d *drainer) list(_ context.Context, fromNode string) ([]*core.Pod, error) {
Expand Down
Loading