From de91c7dad4054c4a272827daf851047688368b2a Mon Sep 17 00:00:00 2001 From: Illia Litovchenko Date: Wed, 11 Feb 2026 11:57:13 +0000 Subject: [PATCH 1/2] pod informer added / node and manager update --- internal/informer/manager.go | 61 ++- internal/informer/manager_test.go | 14 +- internal/informer/node_informer.go | 52 ++- internal/informer/node_informer_test.go | 227 +++++++++- internal/informer/pod_informer.go | 67 ++- internal/informer/pod_informer_test.go | 540 +++++++++++++++++++++++- 6 files changed, 898 insertions(+), 63 deletions(-) diff --git a/internal/informer/manager.go b/internal/informer/manager.go index 838a5f14..9a47afbf 100644 --- a/internal/informer/manager.go +++ b/internal/informer/manager.go @@ -9,6 +9,7 @@ import ( "github.com/sirupsen/logrus" authorizationv1 "k8s.io/api/authorization/v1" + core "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -42,7 +43,7 @@ type Manager struct { factory informers.SharedInformerFactory cacheSyncTimeout time.Duration - nodes NodeInformer + nodes *nodeInformer pods *podInformer volumeAttachments *vaInformer @@ -63,10 +64,11 @@ func WithCacheSyncTimeout(timeout time.Duration) Option { func EnablePodInformer() Option { return func(m *Manager) { - m.pods = &podInformer{ - informer: m.factory.Core().V1().Pods().Informer(), - lister: m.factory.Core().V1().Pods().Lister(), - } + m.pods = NewPodInformer( + m.factory.Core().V1().Pods().Informer(), + m.factory.Core().V1().Pods().Lister(), + nil, + ) } } @@ -82,7 +84,26 @@ func EnableNodeInformer() Option { // WithNodeIndexers sets custom indexers for the node informer. func WithNodeIndexers(indexers cache.Indexers) Option { return func(n *Manager) { - n.nodes.SetIndexers(indexers) + n.nodes.indexers = indexers + } +} + +func WithDefaultPodNodeNameIndexer() Option { + return WithPodIndexers(cache.Indexers{ + PodIndexerName: func(obj any) ([]string, error) { + pod, ok := obj.(*core.Pod) + if !ok { + return nil, nil + } + return []string{pod.Spec.NodeName}, nil + }, + }) +} + +// WithPodIndexers sets custom indexers for the pod informer. +func WithPodIndexers(indexers cache.Indexers) Option { + return func(n *Manager) { + n.pods.indexers = indexers } } @@ -168,7 +189,7 @@ func (m *Manager) Start(ctx context.Context) error { } m.log.Info("waiting for node informer cache to sync...") - if !cache.WaitForCacheSync(syncCtx.Done(), m.nodes.HasSynced) { + if !cache.WaitForCacheSync(syncCtx.Done(), m.nodes.informer.HasSynced) { cancel() return fmt.Errorf("failed to sync node informer cache within %v", m.cacheSyncTimeout) } @@ -178,7 +199,7 @@ func (m *Manager) Start(ctx context.Context) error { if m.pods != nil { m.log.Info("waiting for pod informer cache to sync...") - if !cache.WaitForCacheSync(syncCtx.Done(), m.pods.HasSynced) { + if !cache.WaitForCacheSync(syncCtx.Done(), m.pods.informer.HasSynced) { cancel() return fmt.Errorf("failed to sync pod informer cache within %v", m.cacheSyncTimeout) } @@ -247,7 +268,7 @@ func (m *Manager) GetNodeLister() listerv1.NodeLister { if m.nodes == nil { return nil } - return m.nodes.Lister() + return m.nodes.lister } // GetNodeInformer returns the node informer for watching node events. @@ -258,20 +279,26 @@ func (m *Manager) GetNodeInformer() NodeInformer { return m.nodes } +// Nodes returns the node informer for node domain operations. +// This is a convenience method that returns the same as GetNodeInformer(). +func (m *Manager) Nodes() NodeInformer { + return m.GetNodeInformer() +} + // GetPodLister returns the pod lister for querying the pod cache. func (m *Manager) GetPodLister() listerv1.PodLister { if m.pods == nil { return nil } - return m.pods.Lister() + return m.pods.lister } -// GetPodInformer returns the pod informer for watching pod events. -func (m *Manager) GetPodInformer() cache.SharedIndexInformer { +// Pods returns the pod informer for pod domain operations. +func (m *Manager) Pods() PodInformer { if m.pods == nil { return nil } - return m.pods.Informer() + return m.pods } // IsVAAvailable indicates whether the VolumeAttachment informer is available. @@ -346,8 +373,8 @@ func (m *Manager) checkVAPermissions(ctx context.Context) error { } func (m *Manager) addIndexers() error { - if m.nodes != nil && m.nodes.Indexers() != nil { - if err := m.nodes.Informer().AddIndexers(m.nodes.Indexers()); err != nil { + if m.nodes != nil && m.nodes.indexers != nil { + if err := m.nodes.informer.AddIndexers(m.nodes.indexers); err != nil { return fmt.Errorf("adding node indexers: %w", err) } } @@ -374,14 +401,14 @@ func (m *Manager) reportCacheSize(ctx context.Context) { return case <-ticker.C: if m.nodes != nil { - nodes := m.nodes.Informer().GetStore().ListKeys() + nodes := m.nodes.informer.GetStore().ListKeys() size := len(nodes) m.log.WithField("cache_size", size).Debug("node informer cache size") metrics.SetInformerCacheSize("node", size) } if m.pods != nil { - pods := m.pods.Informer().GetStore().ListKeys() + pods := m.pods.informer.GetStore().ListKeys() size := len(pods) m.log.WithField("cache_size", size).Debug("pod informer cache size") metrics.SetInformerCacheSize("pod", size) diff --git a/internal/informer/manager_test.go b/internal/informer/manager_test.go index fa449831..54b5a4fe 100644 --- a/internal/informer/manager_test.go +++ b/internal/informer/manager_test.go @@ -30,7 +30,7 @@ func TestNewManager(t *testing.T) { require.NotNil(t, manager.GetFactory()) require.NotNil(t, manager.GetNodeInformer()) require.NotNil(t, manager.GetPodLister()) - require.NotNil(t, manager.GetPodInformer()) + require.NotNil(t, manager.Pods()) // VA getters return nil before Start() because vaAvailable is false by default require.False(t, manager.IsVAAvailable()) @@ -260,17 +260,17 @@ func TestManager_VAInformer(t *testing.T) { require.True(t, manager.IsVAAvailable()) // Test lister - vas, err := manager.GetVALister().List(labels.Everything()) + was, err := manager.GetVALister().List(labels.Everything()) require.NoError(t, err) - require.Len(t, vas, 1) - require.Equal(t, "va-1", vas[0].Name) + require.Len(t, was, 1) + require.Equal(t, "va-1", was[0].Name) // Test indexer with node name lookup indexed, err := manager.GetVAIndexer().ByIndex(VANodeNameIndexer, "node-1") require.NoError(t, err) require.Len(t, indexed, 1) - // Verify no VAs on non-existent node + // Verify no was on non-existent node indexed, err = manager.GetVAIndexer().ByIndex(VANodeNameIndexer, "node-2") require.NoError(t, err) require.Len(t, indexed, 0) @@ -324,7 +324,7 @@ func TestManager_DisabledInformers(t *testing.T) { require.Nil(t, manager.GetNodeLister()) require.Nil(t, manager.GetNodeInformer()) require.Nil(t, manager.GetPodLister()) - require.Nil(t, manager.GetPodInformer()) + require.Nil(t, manager.Pods()) ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) defer cancel() @@ -338,7 +338,7 @@ func TestManager_DisabledInformers(t *testing.T) { require.Nil(t, manager.GetNodeLister()) require.Nil(t, manager.GetNodeInformer()) require.Nil(t, manager.GetPodLister()) - require.Nil(t, manager.GetPodInformer()) + require.Nil(t, manager.Pods()) // VA should still work (always enabled) require.True(t, manager.IsVAAvailable()) diff --git a/internal/informer/node_informer.go b/internal/informer/node_informer.go index ae961cec..b15e6c69 100644 --- a/internal/informer/node_informer.go +++ b/internal/informer/node_informer.go @@ -7,20 +7,26 @@ import ( "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" listerv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" ) type Predicate func(node *corev1.Node) (bool, error) +// NodeInformer provides domain operations for nodes. +// Lifecycle (start/stop/sync) is managed by Manager. type NodeInformer interface { - Start(ctx context.Context) error + // Get retrieves a node by name from cache + Get(name string) (*corev1.Node, error) + + // List returns all nodes from cache + List() ([]*corev1.Node, error) + + // Wait watches for a node to meet a condition. + // Returns a channel that signals when condition is met or context is canceled. + // Used for event-driven waiting (e.g., wait for node to become ready). Wait(ctx context.Context, name string, condition Predicate) chan error - Informer() cache.SharedIndexInformer - Lister() listerv1.NodeLister - Indexers() cache.Indexers - SetIndexers(indexers cache.Indexers) - HasSynced() bool } type observable struct { @@ -44,7 +50,7 @@ type nodeInformer struct { func NewNodeInformer( informer cache.SharedIndexInformer, lister listerv1.NodeLister, -) NodeInformer { +) *nodeInformer { n := &nodeInformer{ informer: informer, lister: lister, @@ -123,6 +129,18 @@ func (n *nodeInformer) onEvent(object any) { } } +func (n *nodeInformer) Get(name string) (*corev1.Node, error) { + return n.lister.Get(name) +} + +func (n *nodeInformer) List() ([]*corev1.Node, error) { + nodes, err := n.lister.List(labels.Everything()) + if err != nil { + return nil, err + } + return nodes, nil +} + func (n *nodeInformer) Wait(ctx context.Context, name string, condition Predicate) chan error { done := make(chan error, 1) @@ -165,23 +183,3 @@ func (n *nodeInformer) Wait(ctx context.Context, name string, condition Predicat return done } - -func (n *nodeInformer) Informer() cache.SharedIndexInformer { - return n.informer -} - -func (n *nodeInformer) Lister() listerv1.NodeLister { - return n.lister -} - -func (n *nodeInformer) Indexers() cache.Indexers { - return n.indexers -} - -func (n *nodeInformer) SetIndexers(indexers cache.Indexers) { - n.indexers = indexers -} - -func (n *nodeInformer) HasSynced() bool { - return n.informer.HasSynced() -} diff --git a/internal/informer/node_informer_test.go b/internal/informer/node_informer_test.go index dc7570fb..af6093a2 100644 --- a/internal/informer/node_informer_test.go +++ b/internal/informer/node_informer_test.go @@ -24,10 +24,10 @@ func TestNodeInformer_Informer(t *testing.T) { clientset := fake.NewClientset() manager := NewManager(log, clientset, time.Hour, EnableNodeInformer()) - informer := manager.nodes.Informer() - lister := manager.nodes.Lister() + informer := manager.nodes.informer + lister := manager.nodes.lister - require.False(t, manager.nodes.HasSynced()) + require.False(t, manager.nodes.informer.HasSynced()) require.NotNil(t, informer) require.NotNil(t, lister) } @@ -482,3 +482,224 @@ func TestNodeInformer_OnEvent_MultipleNodes(t *testing.T) { require.NoError(t, err2) }) } + +func TestNodeInformer_Get(t *testing.T) { + t.Parallel() + + node1 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + + node2 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-2", + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + }, + }, + } + + tests := []struct { + name string + initialNodes []*corev1.Node + nodeName string + wantNode *corev1.Node + wantErr bool + }{ + { + name: "get existing node", + initialNodes: []*corev1.Node{node1, node2}, + nodeName: "test-node-1", + wantNode: node1, + wantErr: false, + }, + { + name: "get non-existent node", + initialNodes: []*corev1.Node{node1}, + nodeName: "non-existent-node", + wantNode: nil, + wantErr: true, + }, + { + name: "get from empty cache", + initialNodes: nil, + nodeName: "test-node-1", + wantNode: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + var initialObjects []runtime.Object + for _, node := range tt.initialNodes { + initialObjects = append(initialObjects, node) + } + + clientSet := fake.NewClientset(initialObjects...) + log := logrus.New() + + infMgr := NewManager(log, clientSet, 10*time.Minute, EnableNodeInformer()) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(func() { + cancel() + }) + + go func() { + _ = infMgr.Start(ctx) + }() + synctest.Wait() + + node, err := infMgr.GetNodeInformer().Get(tt.nodeName) + + if tt.wantErr { + require.Error(t, err) + require.Nil(t, node) + } else { + require.NoError(t, err) + require.NotNil(t, node) + require.Equal(t, tt.wantNode.Name, node.Name) + } + }) + }) + } +} + +func TestNodeInformer_List(t *testing.T) { + t.Parallel() + + node1 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + + node2 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-2", + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionFalse, + }, + }, + }, + } + + node3 := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-3", + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{ + { + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + + tests := []struct { + name string + initialNodes []*corev1.Node + wantCount int + wantErr bool + }{ + { + name: "list multiple nodes", + initialNodes: []*corev1.Node{node1, node2, node3}, + wantCount: 3, + wantErr: false, + }, + { + name: "list single node", + initialNodes: []*corev1.Node{node1}, + wantCount: 1, + wantErr: false, + }, + { + name: "list empty cache", + initialNodes: nil, + wantCount: 0, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + var initialObjects []runtime.Object + for _, node := range tt.initialNodes { + initialObjects = append(initialObjects, node) + } + + clientSet := fake.NewClientset(initialObjects...) + log := logrus.New() + + infMgr := NewManager(log, clientSet, 10*time.Minute, EnableNodeInformer()) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(func() { + cancel() + }) + + go func() { + _ = infMgr.Start(ctx) + }() + synctest.Wait() + + nodes, err := infMgr.GetNodeInformer().List() + + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Len(t, nodes, tt.wantCount) + + if tt.wantCount > 0 { + nodeNames := make(map[string]bool) + for _, node := range nodes { + nodeNames[node.Name] = true + } + + for _, expectedNode := range tt.initialNodes { + require.True(t, nodeNames[expectedNode.Name], "expected node %s not found in list", expectedNode.Name) + } + } + } + }) + }) + } +} diff --git a/internal/informer/pod_informer.go b/internal/informer/pod_informer.go index f30bf5a3..717c7b2f 100644 --- a/internal/informer/pod_informer.go +++ b/internal/informer/pod_informer.go @@ -1,24 +1,79 @@ package informer import ( + "errors" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" listerv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" ) +var ErrIndexerMissing = errors.New("missing indexer") + +const PodIndexerName = "spec.NodeName" + +// PodInformer provides domain operations for pods. +type PodInformer interface { + // Get retrieves a pod by namespace and name from cache + Get(namespace, name string) (*corev1.Pod, error) + + // List returns all pods from cache + List() ([]*corev1.Pod, error) + + // ListByNode returns all pods running on a specific node + ListByNode(nodeName string) ([]*corev1.Pod, error) + + // ListByNamespace returns all pods in a namespace + ListByNamespace(namespace string) ([]*corev1.Pod, error) +} + type podInformer struct { informer cache.SharedIndexInformer lister listerv1.PodLister indexers cache.Indexers } -func (p *podInformer) Informer() cache.SharedIndexInformer { - return p.informer +func NewPodInformer( + informer cache.SharedIndexInformer, + lister listerv1.PodLister, + indexer cache.Indexer, +) *podInformer { + p := &podInformer{ + informer: informer, + lister: lister, + } + return p +} + +// Get retrieves a pod by namespace and name from cache +func (p *podInformer) Get(namespace, name string) (*corev1.Pod, error) { + return p.lister.Pods(namespace).Get(name) } -func (p *podInformer) Lister() listerv1.PodLister { - return p.lister +// List returns all pods from cache +func (p *podInformer) List() ([]*corev1.Pod, error) { + return p.lister.List(labels.Everything()) +} + +// ListByNode returns all pods running on a specific node +func (p *podInformer) ListByNode(nodeName string) ([]*corev1.Pod, error) { + objects, err := p.informer.GetIndexer().ByIndex(PodIndexerName, nodeName) + if err != nil { + return nil, err + } + + pods := make([]*corev1.Pod, 0, len(objects)) + for _, obj := range objects { + pod, ok := obj.(*corev1.Pod) + if ok { + pods = append(pods, pod) + } + } + return pods, nil } -func (p *podInformer) HasSynced() bool { - return p.informer.HasSynced() +// ListByNamespace returns all pods in a namespace +func (p *podInformer) ListByNamespace(namespace string) ([]*corev1.Pod, error) { + return p.lister.Pods(namespace).List(labels.Everything()) } diff --git a/internal/informer/pod_informer_test.go b/internal/informer/pod_informer_test.go index b0705151..1633ef29 100644 --- a/internal/informer/pod_informer_test.go +++ b/internal/informer/pod_informer_test.go @@ -1,11 +1,16 @@ package informer import ( + "context" "testing" + "testing/synctest" "time" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" ) @@ -16,10 +21,539 @@ func TestPodInformer_Informer(t *testing.T) { clientset := fake.NewClientset() manager := NewManager(log, clientset, time.Hour, EnablePodInformer()) - informer := manager.pods.Informer() - lister := manager.pods.Lister() + informer := manager.pods.informer + lister := manager.pods.lister require.NotNil(t, informer) require.NotNil(t, lister) - require.False(t, manager.pods.HasSynced()) + require.False(t, manager.pods.informer.HasSynced()) +} + +func TestPodInformer_Get(t *testing.T) { + t.Parallel() + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-1", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-2", + Namespace: "kube-system", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + + tests := []struct { + name string + initialPods []*corev1.Pod + namespace string + podName string + wantPod *corev1.Pod + wantErr bool + }{ + { + name: "get existing pod in default namespace", + initialPods: []*corev1.Pod{pod1, pod2}, + namespace: "default", + podName: "test-pod-1", + wantPod: pod1, + wantErr: false, + }, + { + name: "get existing pod in kube-system namespace", + initialPods: []*corev1.Pod{pod1, pod2}, + namespace: "kube-system", + podName: "test-pod-2", + wantPod: pod2, + wantErr: false, + }, + { + name: "get non-existent pod", + initialPods: []*corev1.Pod{pod1}, + namespace: "default", + podName: "non-existent-pod", + wantPod: nil, + wantErr: true, + }, + { + name: "get pod from wrong namespace", + initialPods: []*corev1.Pod{pod1}, + namespace: "kube-system", + podName: "test-pod-1", + wantPod: nil, + wantErr: true, + }, + { + name: "get from empty cache", + initialPods: nil, + namespace: "default", + podName: "test-pod-1", + wantPod: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + var initialObjects []runtime.Object + for _, pod := range tt.initialPods { + initialObjects = append(initialObjects, pod) + } + + clientSet := fake.NewClientset(initialObjects...) + log := logrus.New() + + infMgr := NewManager(log, clientSet, 10*time.Minute, EnablePodInformer()) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(func() { + cancel() + }) + + go func() { + _ = infMgr.Start(ctx) + }() + synctest.Wait() + + pod, err := infMgr.Pods().Get(tt.namespace, tt.podName) + + if tt.wantErr { + require.Error(t, err) + require.Nil(t, pod) + } else { + require.NoError(t, err) + require.NotNil(t, pod) + require.Equal(t, tt.wantPod.Name, pod.Name) + require.Equal(t, tt.wantPod.Namespace, pod.Namespace) + } + }) + }) + } +} + +func TestPodInformer_List(t *testing.T) { + t.Parallel() + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-1", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-2", + Namespace: "kube-system", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + } + + pod3 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-3", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + } + + tests := []struct { + name string + initialPods []*corev1.Pod + wantCount int + wantErr bool + }{ + { + name: "list multiple pods", + initialPods: []*corev1.Pod{pod1, pod2, pod3}, + wantCount: 3, + wantErr: false, + }, + { + name: "list single pod", + initialPods: []*corev1.Pod{pod1}, + wantCount: 1, + wantErr: false, + }, + { + name: "list empty cache", + initialPods: nil, + wantCount: 0, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + var initialObjects []runtime.Object + for _, pod := range tt.initialPods { + initialObjects = append(initialObjects, pod) + } + + clientSet := fake.NewClientset(initialObjects...) + log := logrus.New() + + infMgr := NewManager(log, clientSet, 10*time.Minute, EnablePodInformer()) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(func() { + cancel() + }) + + go func() { + _ = infMgr.Start(ctx) + }() + synctest.Wait() + + pods, err := infMgr.Pods().List() + + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Len(t, pods, tt.wantCount) + + // Verify that the returned pods match the initial pods + if tt.wantCount > 0 { + podKeys := make(map[string]bool) + for _, pod := range pods { + key := pod.Namespace + "/" + pod.Name + podKeys[key] = true + } + + for _, expectedPod := range tt.initialPods { + expectedKey := expectedPod.Namespace + "/" + expectedPod.Name + require.True(t, podKeys[expectedKey], "expected pod %s not found in list", expectedKey) + } + } + } + }) + }) + } +} + +func TestPodInformer_ListByNode(t *testing.T) { + t.Parallel() + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-1", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-2", + Namespace: "kube-system", + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + } + + pod3 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-3", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + } + + pod4 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-4", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "", // unscheduled pod + }, + } + + tests := []struct { + name string + initialPods []*corev1.Pod + nodeName string + wantCount int + wantPods []*corev1.Pod + wantErr bool + }{ + { + name: "list pods on node-1", + initialPods: []*corev1.Pod{pod1, pod2, pod3}, + nodeName: "node-1", + wantCount: 2, + wantPods: []*corev1.Pod{pod1, pod2}, + wantErr: false, + }, + { + name: "list pods on node-2", + initialPods: []*corev1.Pod{pod1, pod2, pod3}, + nodeName: "node-2", + wantCount: 1, + wantPods: []*corev1.Pod{pod3}, + wantErr: false, + }, + { + name: "list pods on node with no pods", + initialPods: []*corev1.Pod{pod1, pod2}, + nodeName: "node-3", + wantCount: 0, + wantPods: nil, + wantErr: false, + }, + { + name: "list pods with empty cache", + initialPods: nil, + nodeName: "node-1", + wantCount: 0, + wantPods: nil, + wantErr: false, + }, + { + name: "list pods excludes unscheduled pods", + initialPods: []*corev1.Pod{pod1, pod4}, + nodeName: "node-1", + wantCount: 1, + wantPods: []*corev1.Pod{pod1}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + var initialObjects []runtime.Object + for _, pod := range tt.initialPods { + initialObjects = append(initialObjects, pod) + } + + clientSet := fake.NewClientset(initialObjects...) + log := logrus.New() + + infMgr := NewManager(log, clientSet, 10*time.Minute, EnablePodInformer(), WithDefaultPodNodeNameIndexer()) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(func() { + cancel() + }) + + go func() { + _ = infMgr.Start(ctx) + }() + synctest.Wait() + + pods, err := infMgr.Pods().ListByNode(tt.nodeName) + + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Len(t, pods, tt.wantCount) + + // Verify that all returned pods are on the specified node + for _, pod := range pods { + require.Equal(t, tt.nodeName, pod.Spec.NodeName) + } + + // Verify that the returned pods match the expected pods + if tt.wantCount > 0 { + podKeys := make(map[string]bool) + for _, pod := range pods { + key := pod.Namespace + "/" + pod.Name + podKeys[key] = true + } + + for _, expectedPod := range tt.wantPods { + expectedKey := expectedPod.Namespace + "/" + expectedPod.Name + require.True(t, podKeys[expectedKey], "expected pod %s not found in list", expectedKey) + } + } + } + }) + }) + } +} + +func TestPodInformer_ListByNamespace(t *testing.T) { + t.Parallel() + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-1", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-2", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + } + + pod3 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-3", + Namespace: "kube-system", + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + } + + pod4 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod-4", + Namespace: "custom-namespace", + }, + Spec: corev1.PodSpec{ + NodeName: "node-1", + }, + } + + tests := []struct { + name string + initialPods []*corev1.Pod + namespace string + wantCount int + wantPods []*corev1.Pod + wantErr bool + }{ + { + name: "list pods in default namespace", + initialPods: []*corev1.Pod{pod1, pod2, pod3, pod4}, + namespace: "default", + wantCount: 2, + wantPods: []*corev1.Pod{pod1, pod2}, + wantErr: false, + }, + { + name: "list pods in kube-system namespace", + initialPods: []*corev1.Pod{pod1, pod2, pod3, pod4}, + namespace: "kube-system", + wantCount: 1, + wantPods: []*corev1.Pod{pod3}, + wantErr: false, + }, + { + name: "list pods in custom namespace", + initialPods: []*corev1.Pod{pod1, pod2, pod3, pod4}, + namespace: "custom-namespace", + wantCount: 1, + wantPods: []*corev1.Pod{pod4}, + wantErr: false, + }, + { + name: "list pods in empty namespace", + initialPods: []*corev1.Pod{pod1, pod2}, + namespace: "empty-namespace", + wantCount: 0, + wantPods: nil, + wantErr: false, + }, + { + name: "list pods with empty cache", + initialPods: nil, + namespace: "default", + wantCount: 0, + wantPods: nil, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + synctest.Test(t, func(t *testing.T) { + var initialObjects []runtime.Object + for _, pod := range tt.initialPods { + initialObjects = append(initialObjects, pod) + } + + clientSet := fake.NewClientset(initialObjects...) + log := logrus.New() + + infMgr := NewManager(log, clientSet, 10*time.Minute, EnablePodInformer()) + + ctx, cancel := context.WithCancel(t.Context()) + t.Cleanup(func() { + cancel() + }) + + go func() { + _ = infMgr.Start(ctx) + }() + synctest.Wait() + + pods, err := infMgr.Pods().ListByNamespace(tt.namespace) + + if tt.wantErr { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Len(t, pods, tt.wantCount) + + // Verify that all returned pods are in the specified namespace + for _, pod := range pods { + require.Equal(t, tt.namespace, pod.Namespace) + } + + // Verify that the returned pods match the expected pods + if tt.wantCount > 0 { + podNames := make(map[string]bool) + for _, pod := range pods { + podNames[pod.Name] = true + } + + for _, expectedPod := range tt.wantPods { + require.True(t, podNames[expectedPod.Name], "expected pod %s not found in namespace %s", expectedPod.Name, tt.namespace) + } + } + } + }) + }) + } } From 10c95e12b770242c6177e3641f93541ddd7a611d Mon Sep 17 00:00:00 2001 From: Illia Litovchenko Date: Wed, 11 Feb 2026 12:00:00 +0000 Subject: [PATCH 2/2] spellchecker fix --- internal/informer/manager_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/informer/manager_test.go b/internal/informer/manager_test.go index 54b5a4fe..cbd84f08 100644 --- a/internal/informer/manager_test.go +++ b/internal/informer/manager_test.go @@ -260,17 +260,17 @@ func TestManager_VAInformer(t *testing.T) { require.True(t, manager.IsVAAvailable()) // Test lister - was, err := manager.GetVALister().List(labels.Everything()) + volumeAttachments, err := manager.GetVALister().List(labels.Everything()) require.NoError(t, err) - require.Len(t, was, 1) - require.Equal(t, "va-1", was[0].Name) + require.Len(t, volumeAttachments, 1) + require.Equal(t, "va-1", volumeAttachments[0].Name) // Test indexer with node name lookup indexed, err := manager.GetVAIndexer().ByIndex(VANodeNameIndexer, "node-1") require.NoError(t, err) require.Len(t, indexed, 1) - // Verify no was on non-existent node + // Verify no volumeAttachments on non-existent node indexed, err = manager.GetVAIndexer().ByIndex(VANodeNameIndexer, "node-2") require.NoError(t, err) require.Len(t, indexed, 0)