k8s client-go源码分析 informer源码分析(2)-初始化与启动分析

前面一篇文章对k8s informer做了概要分析,本篇文章将对informer的初始化与启动进行分析。



k8s client-go informer主要包括以下部件:
(2)DeltaFIFO:DeltaFIFO中存储着一个map和一个queue,即map[object key]Deltas以及object key的queue,Deltas为Delta的切片类型,Delta装有对象及对象的变化类型(Added/Updated/Deleted/Sync) ,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出;
(3)Controller:Controller从DeltaFIFO的queue中pop一个object key出来,并获取其关联的 Deltas出来进行处理,遍历Deltas,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor,相关对象有变化事件发生;


    ... 	factory := informers.NewSharedInformerFactory(client, 30*time.Second) 	podInformer := factory.Core().V1().Pods() 	informer := podInformer.Informer() 	... 	go factory.Start(stopper) 	... 	if !cache.WaitForCacheSync(stopper, informer.HasSynced) { 		runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) 		return 	} 	... 

(1)informers.NewSharedInformerFactory:初始化informer factory;
(2)podInformer.Informer:初始化pod informer;
(3)factory.Start:启动informer factory;


基于k8s v1.17.4版本依赖的client-go


1.1 sharedInformerFactory结构体



// staging/src/k8s.io/client-go/informers/factory.go type sharedInformerFactory struct { 	client           kubernetes.Interface 	namespace        string 	tweakListOptions internalinterfaces.TweakListOptionsFunc 	lock             sync.Mutex 	defaultResync    time.Duration 	customResync     map[reflect.Type]time.Duration  	informers map[reflect.Type]cache.SharedIndexInformer 	// startedInformers is used for tracking which informers have been started. 	// This allows Start() to be called multiple times safely. 	startedInformers map[reflect.Type]bool } 

1.2 NewSharedInformerFactory

NewSharedInformerFactory方法用于初始化informer factory,主要是初始化并返回sharedInformerFactory结构体。

// staging/src/k8s.io/client-go/informers/factory.go func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory { 	return NewSharedInformerFactoryWithOptions(client, defaultResync) }  func NewFilteredSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory { 	return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions)) }  func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { 	factory := &sharedInformerFactory{ 		client:           client, 		namespace:        v1.NamespaceAll, 		defaultResync:    defaultResync, 		informers:        make(map[reflect.Type]cache.SharedIndexInformer), 		startedInformers: make(map[reflect.Type]bool), 		customResync:     make(map[reflect.Type]time.Duration), 	}  	// Apply all options 	for _, opt := range options { 		factory = opt(factory) 	}  	return factory } 



    // 初始化informer factory以及pod informer 	factory := informers.NewSharedInformerFactory(client, 30*time.Second) 	podInformer := factory.Core().V1().Pods() 	informer := podInformer.Informer() 

2.1 podInformer.Informer

Informer方法中调用了f.factory.InformerFor方法来做pod informer的初始化。

// k8s.io/client-go/informers/core/v1/pod.go func (f *podInformer) Informer() cache.SharedIndexInformer { 	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) } 

2.2 f.factory.InformerFor

Informer方法中调用了f.factory.InformerFor方法来做pod informer的初始化,并传入f.defaultInformer作为newFunc,而在f.factory.InformerFor方法中,调用newFunc来初始化informer。

这里也可以看到,其实informer初始化后会存储进map f.informers[informerType]中,即存储进sharedInformerFactory结构体的informers属性中,方便共享使用。

// staging/src/k8s.io/client-go/informers/factory.go func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { 	f.lock.Lock() 	defer f.lock.Unlock()  	informerType := reflect.TypeOf(obj) 	informer, exists := f.informers[informerType] 	if exists { 		return informer 	}  	resyncPeriod, exists := f.customResync[informerType] 	if !exists { 		resyncPeriod = f.defaultResync 	}  	informer = newFunc(f.client, resyncPeriod) 	f.informers[informerType] = informer  	return informer } 

2.3 newFunc/f.defaultInformer

defaultInformer方法中,调用了NewFilteredPodInformer方法来初始化pod informer,最终初始化并返回sharedIndexInformer结构体。

// k8s.io/client-go/informers/core/v1/pod.go func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { 	return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) }  func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { 	return cache.NewSharedIndexInformer( 		&cache.ListWatch{ 			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { 				if tweakListOptions != nil { 					tweakListOptions(&options) 				} 				return client.CoreV1().Pods(namespace).List(options) 			}, 			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { 				if tweakListOptions != nil { 					tweakListOptions(&options) 				} 				return client.CoreV1().Pods(namespace).Watch(options) 			}, 		}, 		&corev1.Pod{}, 		resyncPeriod, 		indexers, 	) }  func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { 	realClock := &clock.RealClock{} 	sharedIndexInformer := &sharedIndexInformer{ 		processor:                       &sharedProcessor{clock: realClock}, 		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), 		listerWatcher:                   lw, 		objectType:                      objType, 		resyncCheckPeriod:               defaultEventHandlerResyncPeriod, 		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, 		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)), 		clock: realClock, 	} 	return sharedIndexInformer } 

2.4 sharedIndexInformer结构体

(2)controller:对应着informer中的部件Controller,Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer中的本地内存缓存,并通知Processor,相关对象有变化事件发生;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go type sharedIndexInformer struct { 	indexer    Indexer 	controller Controller  	processor             *sharedProcessor 	cacheMutationDetector CacheMutationDetector  	// This block is tracked to handle late initialization of the controller 	listerWatcher ListerWatcher 	objectType    runtime.Object  	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call 	// shouldResync to check if any of our listeners need a resync. 	resyncCheckPeriod time.Duration 	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via 	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default 	// value). 	defaultEventHandlerResyncPeriod time.Duration 	// clock allows for testability 	clock clock.Clock  	started, stopped bool 	startedLock      sync.Mutex  	// blockDeltas gives a way to stop all event distribution so that a late event handler 	// can safely join the shared informer. 	blockDeltas sync.Mutex } 


// staging/src/k8s.io/client-go/tools/cache/store.go type cache struct { 	cacheStorage ThreadSafeStore 	keyFunc KeyFunc } 

threadSafeMap struct是ThreadSafeStore接口的一个实现,其最重要的一个属性便是items了,items是用map构建的键值对,资源对象都存在items这个map中,key根据资源对象来算出,value为资源对象本身,这里的items即为informer的本地缓存了,而indexers与indices属性则与索引功能有关。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go type threadSafeMap struct { 	lock  sync.RWMutex 	items map[string]interface{}  	// indexers maps a name to an IndexFunc 	indexers Indexers 	// indices maps a name to an Index 	indices Indices } 



(2)DeltaFIFO:DeltaFIFO存储着map[object key]Deltas以及object key的queue,Delta装有对象及对象的变化类型 ,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出;

// staging/src/k8s.io/client-go/tools/cache/controller.go type controller struct { 	config         Config 	reflector      *Reflector 	reflectorMutex sync.RWMutex 	clock          clock.Clock }  type Config struct { 	// The queue for your objects; either a FIFO or 	// a DeltaFIFO. Your Process() function should accept 	// the output of this Queue's Pop() method. 	Queue 	... } 


sharedInformerFactory.Start为informer factory的启动方法,其主要逻辑为循环遍历informers,然后跑goroutine调用informer.Run来启动sharedInformerFactory中存储的各个informer。

// staging/src/k8s.io/client-go/informers/factory.go func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { 	f.lock.Lock() 	defer f.lock.Unlock()  	for informerType, informer := range f.informers { 		if !f.startedInformers[informerType] { 			go informer.Run(stopCh) 			f.startedInformers[informerType] = true 		} 	} } 



// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { 	defer utilruntime.HandleCrash()          // 初始化DeltaFIFO 	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)          // 构建Config结构体 	cfg := &Config{ 		Queue:            fifo, 		ListerWatcher:    s.listerWatcher, 		ObjectType:       s.objectType, 		FullResyncPeriod: s.resyncCheckPeriod, 		RetryOnError:     false, 		ShouldResync:     s.processor.shouldResync,  		Process: s.HandleDeltas, 	}  	func() { 		s.startedLock.Lock() 		defer s.startedLock.Unlock()         // 初始化controller 		s.controller = New(cfg) 		s.controller.(*controller).clock = s.clock 		s.started = true 	}()  	// Separate stop channel because Processor should be stopped strictly after controller 	processorStopCh := make(chan struct{}) 	var wg wait.Group 	defer wg.Wait()              // Wait for Processor to stop 	defer close(processorStopCh) // Tell Processor to stop 	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) 	// 启动processor 	wg.StartWithChannel(processorStopCh, s.processor.run)  	defer func() { 		s.startedLock.Lock() 		defer s.startedLock.Unlock() 		s.stopped = true // Don't want any new listeners 	}() 	// 启动controller 	s.controller.Run(stopCh) } 

3.1 New


// staging/src/k8s.io/client-go/tools/cache/controller.go func New(c *Config) Controller { 	ctlr := &controller{ 		config: *c, 		clock:  &clock.RealClock{}, 	} 	return ctlr } 

3.2 s.processor.run


// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *sharedProcessor) run(stopCh <-chan struct{}) { 	func() { 		p.listenersLock.RLock() 		defer p.listenersLock.RUnlock() 		for _, listener := range p.listeners { 			p.wg.Start(listener.run) 			p.wg.Start(listener.pop) 		} 		p.listenersStarted = true 	}() 	<-stopCh 	p.listenersLock.RLock() 	defer p.listenersLock.RUnlock() 	for _, listener := range p.listeners { 		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop 	} 	p.wg.Wait() // Wait for all .pop() and .run() to stop } 

3.3 controller.Run


// k8s.io/client-go/tools/cache/controller.go func (c *controller) Run(stopCh <-chan struct{}) { 	defer utilruntime.HandleCrash() 	go func() { 		<-stopCh 		c.config.Queue.Close() 	}() 	r := NewReflector( 		c.config.ListerWatcher, 		c.config.ObjectType, 		c.config.Queue, 		c.config.FullResyncPeriod, 	) 	r.ShouldResync = c.config.ShouldResync 	r.clock = c.clock  	c.reflectorMutex.Lock() 	c.reflector = r 	c.reflectorMutex.Unlock()  	var wg wait.Group 	defer wg.Wait()  	wg.StartWithChannel(stopCh, r.Run)  	wait.Until(c.processLoop, time.Second, stopCh) } 
3.3.1 Reflector结构体

(3)listerWatcher:存放list方法和watch方法的ListerWatcher interface实现;

// k8s.io/client-go/tools/cache/reflector.go type Reflector struct {     ...     expectedType reflect.Type     store Store     listerWatcher ListerWatcher     ... } 
3.3.2 r.Run/Reflector.Run


// staging/src/k8s.io/client-go/tools/cache/reflector.go func (r *Reflector) Run(stopCh <-chan struct{}) { 	klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) 	wait.Until(func() { 		if err := r.ListAndWatch(stopCh); err != nil { 			utilruntime.HandleError(err) 		} 	}, r.period, stopCh) } 
3.3.3 controller.processLoop


// k8s.io/client-go/tools/cache/controller.go func (c *controller) processLoop() { 	for { 		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) 		if err != nil { 			if err == ErrFIFOClosed { 				return 			} 			if c.config.RetryOnError { 				// This is the safe way to re-enqueue. 				c.config.Queue.AddIfNotPresent(obj) 			} 		} 	} } 
3.3.4 c.config.Process/sharedIndexInformer.HandleDeltas



// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { 	s.blockDeltas.Lock() 	defer s.blockDeltas.Unlock()  	// from oldest to newest 	for _, d := range obj.(Deltas) { 		switch d.Type { 		case Sync, Added, Updated: 			isSync := d.Type == Sync 			s.cacheMutationDetector.AddObject(d.Object) 			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { 				if err := s.indexer.Update(d.Object); err != nil { 					return err 				} 				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) 			} else { 				if err := s.indexer.Add(d.Object); err != nil { 					return err 				} 				s.processor.distribute(addNotification{newObj: d.Object}, isSync) 			} 		case Deleted: 			if err := s.indexer.Delete(d.Object); err != nil { 				return err 			} 			s.processor.distribute(deleteNotification{oldObj: d.Object}, false) 		} 	} 	return nil } 


3.3.5 sharedIndexInformer.processor.distribute


// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *sharedProcessor) distribute(obj interface{}, sync bool) { 	p.listenersLock.RLock() 	defer p.listenersLock.RUnlock()  	if sync { 		for _, listener := range p.syncingListeners { 			listener.add(obj) 		} 	} else { 		for _, listener := range p.listeners { 			listener.add(obj) 		} 	} }  func (p *processorListener) add(notification interface{}) { 	p.addCh <- notification } 


3.3.6 listener.pop


// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *processorListener) pop() { 	defer utilruntime.HandleCrash() 	defer close(p.nextCh) // Tell .run() to stop  	var nextCh chan<- interface{} 	var notification interface{} 	for { 		select { 		case nextCh <- notification: 			// Notification dispatched 			var ok bool 			notification, ok = p.pendingNotifications.ReadOne() 			if !ok { // Nothing to pop 				nextCh = nil // Disable this select case 			} 		case notificationToAdd, ok := <-p.addCh: 			if !ok { 				return 			} 			if notification == nil { // No notification to pop (and pendingNotifications is empty) 				// Optimize the case - skip adding to pendingNotifications 				notification = notificationToAdd 				nextCh = p.nextCh 			} else { // There is already a notification waiting to be dispatched 				p.pendingNotifications.WriteOne(notificationToAdd) 			} 		} 	} } 
3.3.7 listener.run


// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (p *processorListener) run() { 	// this call blocks until the channel is closed.  When a panic happens during the notification 	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second) 	// the next notification will be attempted.  This is usually better than the alternative of never 	// delivering again. 	stopCh := make(chan struct{}) 	wait.Until(func() { 		// this gives us a few quick retries before a long pause and then a few more quick retries 		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) { 			for next := range p.nextCh { 				switch notification := next.(type) { 				case updateNotification: 					p.handler.OnUpdate(notification.oldObj, notification.newObj) 				case addNotification: 					p.handler.OnAdd(notification.newObj) 				case deleteNotification: 					p.handler.OnDelete(notification.oldObj) 				default: 					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) 				} 			} 			// the only way to get here is if the p.nextCh is empty and closed 			return true, nil 		})  		// the only way to get here is if the p.nextCh is empty and closed 		if err == nil { 			close(stopCh) 		} 	}, 1*time.Minute, stopCh) }  


informer.AddEventHandler(cache.ResourceEventHandlerFuncs{     AddFunc:    onAdd,     UpdateFunc: onUpdate,     DeleteFunc: onDelete,   }) 
// staging/src/k8s.io/client-go/tools/cache/controller.go type ResourceEventHandlerFuncs struct { 	AddFunc    func(obj interface{}) 	UpdateFunc func(oldObj, newObj interface{}) 	DeleteFunc func(obj interface{}) }  func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) { 	if r.AddFunc != nil { 		r.AddFunc(obj) 	} }  func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) { 	if r.UpdateFunc != nil { 		r.UpdateFunc(oldObj, newObj) 	} }  func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) { 	if r.DeleteFunc != nil { 		r.DeleteFunc(obj) 	} } 

4.cache.WaitForCacheSync(stopper, informer.HasSynced)

可以看出在cache.WaitForCacheSync方法中,实际上是调用方法入参cacheSyncs ...InformerSynced来判断cache是否同步完成(即调用informer.HasSynced方法),而这里说的cache同步完成,意思是等待informer从kube-apiserver同步资源完成,即informer的list操作获取的对象都存入到informer中的indexer本地缓存中;

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { 	err := wait.PollImmediateUntil(syncedPollPeriod, 		func() (bool, error) { 			for _, syncFunc := range cacheSyncs { 				if !syncFunc() { 					return false, nil 				} 			} 			return true, nil 		}, 		stopCh) 	if err != nil { 		klog.V(2).Infof("stop requested") 		return false 	}  	klog.V(4).Infof("caches populated") 	return true } 

4.1 informer.HasSynced


// staging/src/k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) HasSynced() bool { 	s.startedLock.Lock() 	defer s.startedLock.Unlock()  	if s.controller == nil { 		return false 	} 	return s.controller.HasSynced() } 


// staging/src/k8s.io/client-go/tools/cache/controller.go func (c *controller) HasSynced() bool { 	return c.config.Queue.HasSynced() } 

4.2 sharedInformerFactory.WaitForCacheSync


// staging/src/k8s.io/client-go/informers/factory.go func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { 	informers := func() map[reflect.Type]cache.SharedIndexInformer { 		f.lock.Lock() 		defer f.lock.Unlock()  		informers := map[reflect.Type]cache.SharedIndexInformer{} 		for informerType, informer := range f.informers { 			if f.startedInformers[informerType] { 				informers[informerType] = informer 			} 		} 		return informers 	}()  	res := map[reflect.Type]bool{} 	for informType, informer := range informers { 		res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) 	} 	return res } 





