浅析kubernetes中client-go Informer

之前了解了client-go中的架构设计,也就是 tools/cache 下面的一些概念,那么下面将对informer进行分析

Controller

在client-go informer架构中存在一个 controller ,这个不是 Kubernetes 中的Controller组件;而是在 tools/cache 中的一个概念,controller 位于 informer 之下,Reflector 之上。code

Config

从严格意义上来讲,controller 是作为一个 sharedInformer 使用,通过接受一个 Config ,而 Reflector 则作为 controller 的 slot。Config 则包含了这个 controller 里所有的设置。

type Config struct { 	Queue // DeltaFIFO 	ListerWatcher // 用于list watch的 	Process ProcessFunc // 定义如何从DeltaFIFO中弹出数据后处理的操作 	ObjectType runtime.Object // Controller处理的对象数据,实际上就是kubernetes中的资源 	FullResyncPeriod time.Duration // 全量同步的周期 	ShouldResync ShouldResyncFunc // Reflector通过该标记来确定是否应该重新同步 	RetryOnError bool } 

controller

然后 controller 又为 reflertor 的上层

type controller struct { 	config         Config 	reflector      *Reflector  	reflectorMutex sync.RWMutex 	clock          clock.Clock }  type Controller interface { 	// controller 主要做两件事,     // 1. 构建并运行 Reflector,将listerwacther中的泵压到queue(Delta fifo)中     // 2. Queue用Pop()弹出数据,具体的操作是Process     // 直到 stopCh 不阻塞,这两个协程将退出 	Run(stopCh <-chan struct{}) 	HasSynced() bool // 这个实际上是从store中继承的,标记这个controller已经 	LastSyncResourceVersion() string } 

controller 中的方法,仅有一个 Run()New();这意味着,controller 只是一个抽象的概念,作为 Reflector, Delta FIFO 整合的工作流

浅析kubernetes中client-go Informer

controller 则是 SharedInformer 了。

Queue

这里的 queue 可以理解为是一个具有 Pop() 功能的 Indexer ;而 Pop() 的功能则是 controller 中的一部分;也就是说 queue 是一个扩展的 StoreStore 是不具备弹出功能的。

type Queue interface { 	Store 	// Pop会阻塞等待,直到有内容弹出,删除对应的值并处理计数器 	Pop(PopProcessFunc) (interface{}, error)  	// AddIfNotPresent puts the given accumulator into the Queue (in 	// association with the accumulator's key) if and only if that key 	// is not already associated with a non-empty accumulator. 	AddIfNotPresent(interface{}) error  	// HasSynced returns true if the first batch of keys have all been 	// popped.  The first batch of keys are those of the first Replace 	// operation if that happened before any Add, Update, or Delete; 	// otherwise the first batch is empty. 	HasSynced() bool 	Close() // 关闭queue } 

而弹出的操作是通过 controller 中的 processLoop() 进行的,最终走到Delta FIFO中进行处理。

通过忙等待去读取要弹出的数据,然后在弹出前 通过PopProcessFunc 进行处理

func (c *controller) processLoop() { 	for { 		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) 		if err != nil { 			if err == ErrFIFOClosed { 				return 			} 			if c.config.RetryOnError { 				// This is the safe way to re-enqueue. 				c.config.Queue.AddIfNotPresent(obj) 			} 		} 	} } 

DeltaFIFO.Pop()

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { 	f.lock.Lock() 	defer f.lock.Unlock() 	for { 		for len(f.queue) == 0 { 			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued. 			// When Close() is called, the f.closed is set and the condition is broadcasted. 			// Which causes this loop to continue and return from the Pop(). 			if f.IsClosed() { 				return nil, ErrFIFOClosed 			}  			f.cond.Wait() 		} 		id := f.queue[0] 		f.queue = f.queue[1:] 		if f.initialPopulationCount > 0 { 			f.initialPopulationCount-- 		} 		item, ok := f.items[id] 		if !ok { 			// Item may have been deleted subsequently. 			continue 		} 		delete(f.items, id) 		err := process(item) // 进行处理 		if e, ok := err.(ErrRequeue); ok { 			f.addIfNotPresent(id, item) // 如果失败,再重新加入到队列中 			err = e.Err  		} 		// Don't need to copyDeltas here, because we're transferring 		// ownership to the caller. 		return item, err 	} } 

Informer

通过对 Reflector, Store, Queue, ListerWatcherProcessFunc, 等的概念,发现由 controller 所包装的起的功能并不能完成通过对API的动作监听,并通过动作来处理本地缓存的一个能力;这个情况下诞生了 informer 严格意义上来讲是 sharedInformer

func newInformer( 	lw ListerWatcher, 	objType runtime.Object, 	resyncPeriod time.Duration, 	h ResourceEventHandler, 	clientState Store, ) Controller { 	// This will hold incoming changes. Note how we pass clientState in as a 	// KeyLister, that way resync operations will result in the correct set 	// of update/delete deltas. 	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 		KnownObjects:          clientState, 		EmitDeltaTypeReplaced: true, 	})  	cfg := &Config{ 		Queue:            fifo, 		ListerWatcher:    lw, 		ObjectType:       objType, 		FullResyncPeriod: resyncPeriod, 		RetryOnError:     false,  		Process: func(obj interface{}) error { 			// from oldest to newest 			for _, d := range obj.(Deltas) { 				switch d.Type { 				case Sync, Replaced, Added, Updated: 					if old, exists, err := clientState.Get(d.Object); err == nil && exists { 						if err := clientState.Update(d.Object); err != nil { 							return err 						} 						h.OnUpdate(old, d.Object) 					} else { 						if err := clientState.Add(d.Object); err != nil { 							return err 						} 						h.OnAdd(d.Object) 					} 				case Deleted: 					if err := clientState.Delete(d.Object); err != nil { 						return err 					} 					h.OnDelete(d.Object) 				} 			} 			return nil 		}, 	} 	return New(cfg) } 

newInformer是位于 tools/cache/controller.go 下,可以看出,这里面并没有informer的概念,这里通过注释可以看到,newInformer实际上是一个提供了存储和事件通知的informer。他关联的 queue 则是 Delta FIFO,并包含了 ProcessFunc, Store 等 controller的概念。最终对外的方法为 NewInformer()

func NewInformer( 	lw ListerWatcher, 	objType runtime.Object, 	resyncPeriod time.Duration, 	h ResourceEventHandler, ) (Store, Controller) { 	// This will hold the client state, as we know it. 	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)  	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState) }  type ResourceEventHandler interface { 	OnAdd(obj interface{}) 	OnUpdate(oldObj, newObj interface{}) 	OnDelete(obj interface{}) } 

可以看到 NewInformer() 就是一个带有 Store功能的controller,通过这些可以假定出,Informer 就是controller ,将queue中相关操作分发给不同事件处理的功能

SharedIndexInformer

shareInformer 为客户端提供了与apiserver一致的数据对象本地缓存,并支持多事件处理程序的informer,而 shareIndexInformer 则是对shareInformer 的扩展

type SharedInformer interface { 	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync 	// period.  Events to a single handler are delivered sequentially, but there is no coordination 	// between different handlers. 	AddEventHandler(handler ResourceEventHandler) 	// AddEventHandlerWithResyncPeriod adds an event handler to the 	// shared informer with the requested resync period; zero means 	// this handler does not care about resyncs.  The resync operation 	// consists of delivering to the handler an update notification 	// for every object in the informer's local cache; it does not add 	// any interactions with the authoritative storage.  Some 	// informers do no resyncs at all, not even for handlers added 	// with a non-zero resyncPeriod.  For an informer that does 	// resyncs, and for each handler that requests resyncs, that 	// informer develops a nominal resync period that is no shorter 	// than the requested period but may be longer.  The actual time 	// between any two resyncs may be longer than the nominal period 	// because the implementation takes time to do work and there may 	// be competing load and scheduling noise. 	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) 	// GetStore returns the informer's local cache as a Store. 	GetStore() Store 	// GetController is deprecated, it does nothing useful 	GetController() Controller 	// Run starts and runs the shared informer, returning after it stops. 	// The informer will be stopped when stopCh is closed. 	Run(stopCh <-chan struct{}) 	// HasSynced returns true if the shared informer's store has been 	// informed by at least one full LIST of the authoritative state 	// of the informer's object collection.  This is unrelated to "resync". 	HasSynced() bool 	// LastSyncResourceVersion is the resource version observed when last synced with the underlying 	// store. The value returned is not synchronized with access to the underlying store and is not 	// thread-safe. 	LastSyncResourceVersion() string } 

SharedIndexInformer 是对SharedInformer的实现,可以从结构中看出,SharedIndexInformer 大致具有如下功能:

  • 索引本地缓存
  • controller,通过list watch拉取API并推入 Deltal FIFO
  • 事件的处理
type sharedIndexInformer struct { 	indexer    Indexer // 具有索引的本地缓存 	controller Controller // controller  	processor             *sharedProcessor // 事件处理函数集合 	cacheMutationDetector MutationDetector  	listerWatcher ListerWatcher 	objectType runtime.Object 	resyncCheckPeriod time.Duration 	defaultEventHandlerResyncPeriod time.Duration 	clock clock.Clock 	started, stopped bool 	startedLock      sync.Mutex 	blockDeltas sync.Mutex } 

而在 tools/cache/share_informer.go 可以看到 shareIndexInformer 的运行过程

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { 	defer utilruntime.HandleCrash()  	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 		KnownObjects:          s.indexer, 		EmitDeltaTypeReplaced: true, 	})  	cfg := &Config{ 		Queue:            fifo, 		ListerWatcher:    s.listerWatcher, 		ObjectType:       s.objectType, 		FullResyncPeriod: s.resyncCheckPeriod, 		RetryOnError:     false, 		ShouldResync:     s.processor.shouldResync,  		Process: s.HandleDeltas, // process 弹出时操作的流程 	}  	func() { 		s.startedLock.Lock() 		defer s.startedLock.Unlock()  		s.controller = New(cfg) 		s.controller.(*controller).clock = s.clock 		s.started = true 	}()  	// Separate stop channel because Processor should be stopped strictly after controller 	processorStopCh := make(chan struct{}) 	var wg wait.Group 	defer wg.Wait()              // Wait for Processor to stop 	defer close(processorStopCh) // Tell Processor to stop 	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) 	wg.StartWithChannel(processorStopCh, s.processor.run) // 启动事件处理函数  	defer func() { 		s.startedLock.Lock() 		defer s.startedLock.Unlock() 		s.stopped = true // Don't want any new listeners 	}()     s.controller.Run(stopCh) // 启动controller,controller会启动Reflector和fifo的Pop() } 

而在操作Delta FIFO中可以看到,做具体操作时,会将动作分发至对应的事件处理函数中,这个是informer初始化时对事件操作的函数

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { 	s.blockDeltas.Lock() 	defer s.blockDeltas.Unlock()   	for _, d := range obj.(Deltas) { 		switch d.Type { 		case Sync, Replaced, Added, Updated: 			s.cacheMutationDetector.AddObject(d.Object) 			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { 				if err := s.indexer.Update(d.Object); err != nil { 					return err 				}  				isSync := false 				switch { 				case d.Type == Sync: 					isSync = true 				case d.Type == Replaced: 					if accessor, err := meta.Accessor(d.Object); err == nil { 						if oldAccessor, err := meta.Accessor(old); err == nil { 							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() 						} 					} 				}                 // 事件的分发 				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) 			} else { 				if err := s.indexer.Add(d.Object); err != nil { 					return err 				}                 // 事件的分发 				s.processor.distribute(addNotification{newObj: d.Object}, false) 			} 		case Deleted: 			if err := s.indexer.Delete(d.Object); err != nil { 				return err 			} 			s.processor.distribute(deleteNotification{oldObj: d.Object}, false) 		} 	} 	return nil } 

事件处理函数 processor

启动informer时也会启动注册进来的事件处理函数;processor 就是这个事件处理函数。

run() 函数会启动两个 listener,j监听事件处理业务函数 listener.run 和 事件的处理

wg.StartWithChannel(processorStopCh, s.processor.run)  func (p *sharedProcessor) run(stopCh <-chan struct{}) { 	func() { 		p.listenersLock.RLock() 		defer p.listenersLock.RUnlock() 		for _, listener := range p.listeners { 			p.wg.Start(listener.run)  			p.wg.Start(listener.pop) 		} 		p.listenersStarted = true 	}() 	<-stopCh 	p.listenersLock.RLock() 	defer p.listenersLock.RUnlock() 	for _, listener := range p.listeners { 		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop 	} 	p.wg.Wait() // Wait for all .pop() and .run() to stop } 

可以看出,就是拿到的事件,根据注册的到informer的事件函数进行处理

func (p *processorListener) run() { 	stopCh := make(chan struct{}) 	wait.Until(func() { 		for next := range p.nextCh { // 消费事件 			switch notification := next.(type) { 			case updateNotification: 				p.handler.OnUpdate(notification.oldObj, notification.newObj) 			case addNotification: 				p.handler.OnAdd(notification.newObj) 			case deleteNotification: 				p.handler.OnDelete(notification.oldObj) 			default: 				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) 			} 		} 		// the only way to get here is if the p.nextCh is empty and closed 		close(stopCh) 	}, 1*time.Second, stopCh) } 

informer中的事件的设计

了解了informer如何处理事件,就需要学习下,informer的事件系统设计 prossorListener

事件的添加

当在handleDelta时,会分发具体的事件

// 事件的分发 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) 

此时,事件泵 Pop() 会根据接收到的事件进行处理

// run() 时会启动一个事件泵 p.wg.Start(listener.pop)  func (p *processorListener) pop() { 	defer utilruntime.HandleCrash() 	defer close(p.nextCh)   	var nextCh chan<- interface{} 	var notification interface{} 	for { 		select {         case nextCh <- notification: // 这里实际上是一个阻塞的等待             // 单向channel 可能不会走到这步骤 			var ok bool             // deltahandle 中 distribute 会将事件添加到addCh待处理事件中             // 处理完事件会再次拿到一个事件 			notification, ok = p.pendingNotifications.ReadOne() 			if !ok { // Nothing to pop 				nextCh = nil // Disable this select case 			}         // 处理 分发过来的事件 addCh 		case notificationToAdd, ok := <-p.addCh: // distribute分发的事件 			if !ok { 				return 			}             // 这里代表第一次,没有任何事件时,或者上面步骤完成读取 			if notification == nil { // 就会走这里 				notification = notificationToAdd  				nextCh = p.nextCh  			} else {                  // notification否则代表没有处理完,将数据再次添加到待处理中 				p.pendingNotifications.WriteOne(notificationToAdd) 			} 		} 	} } 

该消息事件的流程图为

浅析kubernetes中client-go Informer

通过一个简单实例来学习client-go中的消息通知机制

package main  import ( 	"fmt" 	"time"  	"k8s.io/utils/buffer" )  var nextCh1 = make(chan interface{}) var addCh = make(chan interface{}) var stopper = make(chan struct{}) var notification interface{} var pendding = *buffer.NewRingGrowing(2)  func main() { 	// pop 	go func() { 		var nextCh chan<- interface{} 		var notification interface{} 		//var n int 		for { 			fmt.Println("busy wait") 			fmt.Println("entry select", notification) 			select { 			// 初始时,一个未初始化的channel,nil,形成一个阻塞(单channel下是死锁) 			case nextCh <- notification: 				fmt.Println("entry nextCh", notification) 				var ok bool 				// 读不到数据代表已处理完,置空锁 				notification, ok = pendding.ReadOne() 				if !ok { 					fmt.Println("unactive nextch") 					nextCh = nil 				} 			// 事件的分发,监听,初始时也是一个阻塞 			case notificationToAdd, ok := <-addCh: 				fmt.Println(notificationToAdd, notification) 				if !ok { 					return 				} 				// 线程安全 				// 当消息为空时,没有被处理 				// 锁为空,就分发数据 				if notification == nil { 					fmt.Println("frist notification nil") 					notification = notificationToAdd 					nextCh = nextCh1 // 这步骤等于初始化了局部的nextCh,会触发上面的流程 				} else { 					// 在第三次时,会走到这里,数据进入环 					fmt.Println("into ring", notificationToAdd) 					pendding.WriteOne(notificationToAdd) 				} 			} 		} 	}() 	// producer 	go func() { 		i := 0 		for { 			i++ 			if i%5 == 0 { 				addCh <- fmt.Sprintf("thread 2 inner -- %d", i) 				time.Sleep(time.Millisecond * 9000) 			} else { 				addCh <- fmt.Sprintf("thread 2 outer -- %d", i) 				time.Sleep(time.Millisecond * 500) 			} 		} 	}() 	// subsriber 	go func() { 		for { 			for next := range nextCh1 { 				time.Sleep(time.Millisecond * 300) 				fmt.Println("consumer", next) 			} 		} 	}() 	<-stopper } 

总结,这里的机制类似于线程安全,进入临界区的一些算法,临界区就是 nextChnotification 就是保证了至少有一个进程可以进入临界区(要么分发事件,要么生产事件);nextChnextCh1 一个是局部管道一个是全局的,管道未初始化代表了死锁(阻塞);当有消息要处理时,会将局部管道 nextCh 赋值给 全局 nextCh1 此时相当于解除了分发的步骤(对管道赋值,触发分发操作);ringbuffer 实际上是提供了一个对 notification 加锁的操作,在没有处理的消息时,需要保障 notification 为空,同时也关闭了流程 nextCh 的写入。这里主要是考虑对golang中channel的用法

发表评论

相关文章