Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 44 additions & 17 deletions internal/informer/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/sirupsen/logrus"
authorizationv1 "k8s.io/api/authorization/v1"
core "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -42,7 +43,7 @@ type Manager struct {
factory informers.SharedInformerFactory
cacheSyncTimeout time.Duration

nodes NodeInformer
nodes *nodeInformer
pods *podInformer
volumeAttachments *vaInformer

Expand All @@ -63,10 +64,11 @@ func WithCacheSyncTimeout(timeout time.Duration) Option {

func EnablePodInformer() Option {
return func(m *Manager) {
m.pods = &podInformer{
informer: m.factory.Core().V1().Pods().Informer(),
lister: m.factory.Core().V1().Pods().Lister(),
}
m.pods = NewPodInformer(
m.factory.Core().V1().Pods().Informer(),
m.factory.Core().V1().Pods().Lister(),
nil,
)
}
}

Expand All @@ -82,7 +84,26 @@ func EnableNodeInformer() Option {
// WithNodeIndexers sets custom indexers for the node informer.
func WithNodeIndexers(indexers cache.Indexers) Option {
return func(n *Manager) {
n.nodes.SetIndexers(indexers)
n.nodes.indexers = indexers
}
}

func WithDefaultPodNodeNameIndexer() Option {
return WithPodIndexers(cache.Indexers{
PodIndexerName: func(obj any) ([]string, error) {
pod, ok := obj.(*core.Pod)
if !ok {
return nil, nil
}
return []string{pod.Spec.NodeName}, nil
},
})
}

// WithPodIndexers sets custom indexers for the pod informer.
func WithPodIndexers(indexers cache.Indexers) Option {
return func(n *Manager) {
n.pods.indexers = indexers
}
}

Expand Down Expand Up @@ -168,7 +189,7 @@ func (m *Manager) Start(ctx context.Context) error {
}

m.log.Info("waiting for node informer cache to sync...")
if !cache.WaitForCacheSync(syncCtx.Done(), m.nodes.HasSynced) {
if !cache.WaitForCacheSync(syncCtx.Done(), m.nodes.informer.HasSynced) {
cancel()
return fmt.Errorf("failed to sync node informer cache within %v", m.cacheSyncTimeout)
}
Expand All @@ -178,7 +199,7 @@ func (m *Manager) Start(ctx context.Context) error {

if m.pods != nil {
m.log.Info("waiting for pod informer cache to sync...")
if !cache.WaitForCacheSync(syncCtx.Done(), m.pods.HasSynced) {
if !cache.WaitForCacheSync(syncCtx.Done(), m.pods.informer.HasSynced) {
cancel()
return fmt.Errorf("failed to sync pod informer cache within %v", m.cacheSyncTimeout)
}
Expand Down Expand Up @@ -247,7 +268,7 @@ func (m *Manager) GetNodeLister() listerv1.NodeLister {
if m.nodes == nil {
return nil
}
return m.nodes.Lister()
return m.nodes.lister
}

// GetNodeInformer returns the node informer for watching node events.
Expand All @@ -258,20 +279,26 @@ func (m *Manager) GetNodeInformer() NodeInformer {
return m.nodes
}

// Nodes returns the node informer for node domain operations.
// This is a convenience method that returns the same as GetNodeInformer().
func (m *Manager) Nodes() NodeInformer {
return m.GetNodeInformer()
}

// GetPodLister returns the pod lister for querying the pod cache.
func (m *Manager) GetPodLister() listerv1.PodLister {
if m.pods == nil {
return nil
}
return m.pods.Lister()
return m.pods.lister
}

// GetPodInformer returns the pod informer for watching pod events.
func (m *Manager) GetPodInformer() cache.SharedIndexInformer {
// Pods returns the pod informer for pod domain operations.
func (m *Manager) Pods() PodInformer {
if m.pods == nil {
return nil
}
return m.pods.Informer()
return m.pods
}

// IsVAAvailable indicates whether the VolumeAttachment informer is available.
Expand Down Expand Up @@ -346,8 +373,8 @@ func (m *Manager) checkVAPermissions(ctx context.Context) error {
}

func (m *Manager) addIndexers() error {
if m.nodes != nil && m.nodes.Indexers() != nil {
if err := m.nodes.Informer().AddIndexers(m.nodes.Indexers()); err != nil {
if m.nodes != nil && m.nodes.indexers != nil {
if err := m.nodes.informer.AddIndexers(m.nodes.indexers); err != nil {
return fmt.Errorf("adding node indexers: %w", err)
}
}
Expand All @@ -374,14 +401,14 @@ func (m *Manager) reportCacheSize(ctx context.Context) {
return
case <-ticker.C:
if m.nodes != nil {
nodes := m.nodes.Informer().GetStore().ListKeys()
nodes := m.nodes.informer.GetStore().ListKeys()
size := len(nodes)
m.log.WithField("cache_size", size).Debug("node informer cache size")
metrics.SetInformerCacheSize("node", size)
}

if m.pods != nil {
pods := m.pods.Informer().GetStore().ListKeys()
pods := m.pods.informer.GetStore().ListKeys()
size := len(pods)
m.log.WithField("cache_size", size).Debug("pod informer cache size")
metrics.SetInformerCacheSize("pod", size)
Expand Down
14 changes: 7 additions & 7 deletions internal/informer/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestNewManager(t *testing.T) {
require.NotNil(t, manager.GetFactory())
require.NotNil(t, manager.GetNodeInformer())
require.NotNil(t, manager.GetPodLister())
require.NotNil(t, manager.GetPodInformer())
require.NotNil(t, manager.Pods())

// VA getters return nil before Start() because vaAvailable is false by default
require.False(t, manager.IsVAAvailable())
Expand Down Expand Up @@ -260,17 +260,17 @@ func TestManager_VAInformer(t *testing.T) {
require.True(t, manager.IsVAAvailable())

// Test lister
vas, err := manager.GetVALister().List(labels.Everything())
volumeAttachments, err := manager.GetVALister().List(labels.Everything())
require.NoError(t, err)
require.Len(t, vas, 1)
require.Equal(t, "va-1", vas[0].Name)
require.Len(t, volumeAttachments, 1)
require.Equal(t, "va-1", volumeAttachments[0].Name)

// Test indexer with node name lookup
indexed, err := manager.GetVAIndexer().ByIndex(VANodeNameIndexer, "node-1")
require.NoError(t, err)
require.Len(t, indexed, 1)

// Verify no VAs on non-existent node
// Verify no volumeAttachments on non-existent node
indexed, err = manager.GetVAIndexer().ByIndex(VANodeNameIndexer, "node-2")
require.NoError(t, err)
require.Len(t, indexed, 0)
Expand Down Expand Up @@ -324,7 +324,7 @@ func TestManager_DisabledInformers(t *testing.T) {
require.Nil(t, manager.GetNodeLister())
require.Nil(t, manager.GetNodeInformer())
require.Nil(t, manager.GetPodLister())
require.Nil(t, manager.GetPodInformer())
require.Nil(t, manager.Pods())

ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second)
defer cancel()
Expand All @@ -338,7 +338,7 @@ func TestManager_DisabledInformers(t *testing.T) {
require.Nil(t, manager.GetNodeLister())
require.Nil(t, manager.GetNodeInformer())
require.Nil(t, manager.GetPodLister())
require.Nil(t, manager.GetPodInformer())
require.Nil(t, manager.Pods())

// VA should still work (always enabled)
require.True(t, manager.IsVAAvailable())
Expand Down
52 changes: 25 additions & 27 deletions internal/informer/node_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,26 @@ import (

"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
)

type Predicate func(node *corev1.Node) (bool, error)

// NodeInformer provides domain operations for nodes.
// Lifecycle (start/stop/sync) is managed by Manager.
type NodeInformer interface {
Start(ctx context.Context) error
// Get retrieves a node by name from cache
Get(name string) (*corev1.Node, error)

// List returns all nodes from cache
List() ([]*corev1.Node, error)

// Wait watches for a node to meet a condition.
// Returns a channel that signals when condition is met or context is canceled.
// Used for event-driven waiting (e.g., wait for node to become ready).
Wait(ctx context.Context, name string, condition Predicate) chan error
Informer() cache.SharedIndexInformer
Lister() listerv1.NodeLister
Indexers() cache.Indexers
SetIndexers(indexers cache.Indexers)
HasSynced() bool
}

type observable struct {
Expand All @@ -44,7 +50,7 @@ type nodeInformer struct {
func NewNodeInformer(
informer cache.SharedIndexInformer,
lister listerv1.NodeLister,
) NodeInformer {
) *nodeInformer {
n := &nodeInformer{
informer: informer,
lister: lister,
Expand Down Expand Up @@ -123,6 +129,18 @@ func (n *nodeInformer) onEvent(object any) {
}
}

func (n *nodeInformer) Get(name string) (*corev1.Node, error) {
return n.lister.Get(name)
}

func (n *nodeInformer) List() ([]*corev1.Node, error) {
nodes, err := n.lister.List(labels.Everything())
if err != nil {
return nil, err
}
return nodes, nil
}

func (n *nodeInformer) Wait(ctx context.Context, name string, condition Predicate) chan error {
done := make(chan error, 1)

Expand Down Expand Up @@ -165,23 +183,3 @@ func (n *nodeInformer) Wait(ctx context.Context, name string, condition Predicat

return done
}

func (n *nodeInformer) Informer() cache.SharedIndexInformer {
return n.informer
}

func (n *nodeInformer) Lister() listerv1.NodeLister {
return n.lister
}

func (n *nodeInformer) Indexers() cache.Indexers {
return n.indexers
}

func (n *nodeInformer) SetIndexers(indexers cache.Indexers) {
n.indexers = indexers
}

func (n *nodeInformer) HasSynced() bool {
return n.informer.HasSynced()
}
Loading
Loading