Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions internal/k8s/kubernetes.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:generate mockgen -destination ./mock/kubernetes.go . Client

package k8s

import (
Expand Down Expand Up @@ -44,32 +46,49 @@ const (
DefaultMaxRetriesK8SOperation = 5
)

type Client interface {
PatchNode(ctx context.Context, node *v1.Node, changeFn func(*v1.Node)) error
PatchNodeStatus(ctx context.Context, name string, patch []byte) error
EvictPod(ctx context.Context, pod v1.Pod, podEvictRetryDelay time.Duration, version schema.GroupVersion) error
CordonNode(ctx context.Context, node *v1.Node) error
GetNodeByIDs(ctx context.Context, nodeName, nodeID, providerID string) (*v1.Node, error)
ExecuteBatchPodActions(
ctx context.Context,
pods []*v1.Pod,
action func(context.Context, v1.Pod) error,
actionName string,
) ([]*v1.Pod, []PodActionFailure)
DeletePod(ctx context.Context, options metav1.DeleteOptions, pod v1.Pod, podDeleteRetries int, podDeleteRetryDelay time.Duration) error
Clientset() kubernetes.Interface
Log() logrus.FieldLogger
}

// Client provides Kubernetes operations with common dependencies.
type Client struct {
type client struct {
clientset kubernetes.Interface
log logrus.FieldLogger
}

// NewClient creates a new K8s client with the given dependencies.
func NewClient(clientset kubernetes.Interface, log logrus.FieldLogger) *Client {
return &Client{
func NewClient(clientset kubernetes.Interface, log logrus.FieldLogger) Client {
return &client{
clientset: clientset,
log: log,
}
}

// Clientset returns the underlying kubernetes.Interface.
func (c *Client) Clientset() kubernetes.Interface {
func (c *client) Clientset() kubernetes.Interface {
return c.clientset
}

// Log returns the logger.
func (c *Client) Log() logrus.FieldLogger {
func (c *client) Log() logrus.FieldLogger {
return c.log
}

// PatchNode patches a node with the given change function.
func (c *Client) PatchNode(ctx context.Context, node *v1.Node, changeFn func(*v1.Node)) error {
func (c *client) PatchNode(ctx context.Context, node *v1.Node, changeFn func(*v1.Node)) error {
logger := logger.FromContext(ctx, c.log)
oldData, err := json.Marshal(node)
if err != nil {
Expand Down Expand Up @@ -108,7 +127,7 @@ func (c *Client) PatchNode(ctx context.Context, node *v1.Node, changeFn func(*v1
}

// PatchNodeStatus patches the status of a node.
func (c *Client) PatchNodeStatus(ctx context.Context, name string, patch []byte) error {
func (c *client) PatchNodeStatus(ctx context.Context, name string, patch []byte) error {
logger := logger.FromContext(ctx, c.log)

err := waitext.Retry(
Expand All @@ -134,7 +153,7 @@ func (c *Client) PatchNodeStatus(ctx context.Context, name string, patch []byte)
return nil
}

func (c *Client) CordonNode(ctx context.Context, node *v1.Node) error {
func (c *client) CordonNode(ctx context.Context, node *v1.Node) error {
if node.Spec.Unschedulable {
return nil
}
Expand All @@ -149,7 +168,7 @@ func (c *Client) CordonNode(ctx context.Context, node *v1.Node) error {
}

// GetNodeByIDs retrieves a node by name and validates its ID and provider ID.
func (c *Client) GetNodeByIDs(ctx context.Context, nodeName, nodeID, providerID string) (*v1.Node, error) {
func (c *client) GetNodeByIDs(ctx context.Context, nodeName, nodeID, providerID string) (*v1.Node, error) {
if nodeID == "" && providerID == "" {
return nil, fmt.Errorf("node and provider IDs are empty %w", ErrAction)
}
Expand Down Expand Up @@ -178,7 +197,7 @@ func (c *Client) GetNodeByIDs(ctx context.Context, nodeName, nodeID, providerID
// It does internal throttling to avoid spawning a goroutine-per-pod on large lists.
// Returns two sets of pods - the ones that successfully executed the action and the ones that failed.
// actionName might be used to distinguish what is the operation (for logs, debugging, etc.) but is optional.
func (c *Client) ExecuteBatchPodActions(
func (c *client) ExecuteBatchPodActions(
ctx context.Context,
pods []*v1.Pod,
action func(context.Context, v1.Pod) error,
Expand Down Expand Up @@ -250,7 +269,7 @@ func (c *Client) ExecuteBatchPodActions(

// EvictPod evicts a pod from a k8s node. Error handling is based on eviction api documentation:
// https://kubernetes.io/docs/tasks/administer-cluster/safely-drain-node/#the-eviction-api
func (c *Client) EvictPod(ctx context.Context, pod v1.Pod, podEvictRetryDelay time.Duration, version schema.GroupVersion) error {
func (c *client) EvictPod(ctx context.Context, pod v1.Pod, podEvictRetryDelay time.Duration, version schema.GroupVersion) error {
logger := logger.FromContext(ctx, c.log)

b := waitext.NewConstantBackoff(podEvictRetryDelay)
Expand Down Expand Up @@ -306,7 +325,7 @@ func (c *Client) EvictPod(ctx context.Context, pod v1.Pod, podEvictRetryDelay ti
}

// DeletePod deletes a pod from the cluster.
func (c *Client) DeletePod(ctx context.Context, options metav1.DeleteOptions, pod v1.Pod, podDeleteRetries int, podDeleteRetryDelay time.Duration) error {
func (c *client) DeletePod(ctx context.Context, options metav1.DeleteOptions, pod v1.Pod, podDeleteRetries int, podDeleteRetryDelay time.Duration) error {
logger := logger.FromContext(ctx, c.log)

b := waitext.NewConstantBackoff(podDeleteRetryDelay)
Expand Down
170 changes: 170 additions & 0 deletions internal/k8s/mock/kubernetes.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 67 additions & 0 deletions internal/nodes/mock/kubernetes.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading