k8s client-go源码分析 informer源码分析(4)-DeltaFIFO源码分析

client-go之DeltaFIFO源码分析

1.DeltaFIFO概述

先从名字上来看,DeltaFIFO,首先它是一个FIFO,也就是一个先进先出的队列,而Delta代表变化的资源对象,其包含资源对象数据本身及其变化类型。

Delta的组成:

type Delta struct {     Type   DeltaType     Object interface{} } 

DeltaFIFO的组成:

type DeltaFIFO struct {     ...     items map[string]Deltas 	queue []string     ... }  type Deltas []Delta 

具体来说,DeltaFIFO存储着map[object key]Deltas以及object key的queue,Delta装有对象数据及对象的变化类型。输入输出方面,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出。

一个对象能算出一个唯一的object key,其对应着一个Deltas,所以一个对象对应着一个Deltas。

而目前Delta有4种Type,分别是: Added、Updated、Deleted、Sync。针对同一个对象,可能有多个不同Type的Delta元素在Deltas中,表示对该对象做了不同的操作,另外,也可能有多个相同Type的Delta元素在Deltas中(除Deleted外,Delted类型会被去重),比如短时间内,多次对某一个对象进行了更新操作,那么就会有多个Updated类型的Delta放入Deltas中。

k8s client-go源码分析 informer源码分析(4)-DeltaFIFO源码分析

2.DeltaFIFO的定义与初始化分析

2.1 DeltaFIFO struct

DeltaFIFO struct定义了DeltaFIFO的一些属性,下面挑几个重要的分析一下。

(1)lock:读写锁,操作DeltaFIFO中的items与queue之前都要先加锁;
(2)items:是个map,key根据对象算出,value为Deltas类型;
(3)queue:存储对象key的队列;
(4)keyFunc:计算对象key的函数;

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go type DeltaFIFO struct { 	// lock/cond protects access to 'items' and 'queue'. 	lock sync.RWMutex 	cond sync.Cond  	// We depend on the property that items in the set are in 	// the queue and vice versa, and that all Deltas in this 	// map have at least one Delta. 	items map[string]Deltas 	queue []string  	// populated is true if the first batch of items inserted by Replace() has been populated 	// or Delete/Add/Update was called first. 	populated bool 	// initialPopulationCount is the number of items inserted by the first call of Replace() 	initialPopulationCount int  	// keyFunc is used to make the key used for queued item 	// insertion and retrieval, and should be deterministic. 	keyFunc KeyFunc  	// knownObjects list keys that are "known", for the 	// purpose of figuring out which items have been deleted 	// when Replace() or Delete() is called. 	knownObjects KeyListerGetter  	// Indication the queue is closed. 	// Used to indicate a queue is closed so a control loop can exit when a queue is empty. 	// Currently, not used to gate any of CRED operations. 	closed     bool 	closedLock sync.Mutex  
type Deltas

再来看一下Deltas类型,是Delta的切片类型。

type Deltas []Delta 
type Delta

继续看到Delta类型,其包含两个属性:
(1)Type:代表的是Delta的类型,有Added、Updated、Deleted、Sync四个类型;
(2)Object:存储的资源对象,如pod等资源对象;

type Delta struct { 	Type   DeltaType 	Object interface{} } 
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go type DeltaType string  // Change type definition const ( 	Added   DeltaType = "Added" 	Updated DeltaType = "Updated" 	Deleted DeltaType = "Deleted" 	// The other types are obvious. You'll get Sync deltas when: 	//  * A watch expires/errors out and a new list/watch cycle is started. 	//  * You've turned on periodic syncs. 	// (Anything that trigger's DeltaFIFO's Replace() method.) 	Sync DeltaType = "Sync" ) 

2.2 DeltaFIFO初始化-NewDeltaFIFO

NewDeltaFIFO初始化了一个items和queue都为空的DeltaFIFO并返回。

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { 	f := &DeltaFIFO{ 		items:        map[string]Deltas{}, 		queue:        []string{}, 		keyFunc:      keyFunc, 		knownObjects: knownObjects, 	} 	f.cond.L = &f.lock 	return f } 

3.DeltaFIFO核心处理方法分析

在前面分析Reflector时,Reflector的核心处理方法里有调用过几个方法,分别是r.store.Replace、r.store.Add、r.store.Update、r.store.Delete,结合前面文章的k8s informer的初始化与启动分析,或者简要的看一下下面的代码调用,就可以知道Reflector里的r.store其实就是DeltaFIFO,而那几个方法其实就是DeltaFIFO的Replace、Add、Update、Delete方法。

sharedIndexInformer.Run方法中调用NewDeltaFIFO初始化了DeltaFIFO,随后将DeltaFIFO作为参数传入初始化Config;

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {     ...     fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)          cfg := &Config{ 		Queue:            fifo, 		... 	} 	 	func() { 		... 		s.controller = New(cfg) 		... 	}() 	... 	s.controller.Run(stopCh) 

在controller的Run方法中,调用NewReflector初始化Reflector时,将之前的DeltaFIFO传入,赋值给Reflector的store属性,所以Reflector里的r.store其实就是DeltaFIFO,而调用的r.store.Replace、r.store.Add、r.store.Update、r.store.Delete方法其实就是DeltaFIFO的Replace、Add、Update、Delete方法。

func (c *controller) Run(stopCh <-chan struct{}) { 	... 	r := NewReflector( 		c.config.ListerWatcher, 		c.config.ObjectType, 		c.config.Queue, 		c.config.FullResyncPeriod, 	) 	... } 
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { 	return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod) }  func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector { 	r := &Reflector{ 		... 		store:         store, 		... 	} 	... 	return r } 

所以这里对DeltaFIFO核心处理方法进行分析,主要是分析DeltaFIFO的Replace、Add、Update、Delete方法。

3.1 DeltaFIFO.Add

DeltaFIFO的Add操作,主要逻辑:
(1)加锁;
(2)调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Added类型的新Delta追加到相应的Deltas中;
(3)释放锁。

func (f *DeltaFIFO) Add(obj interface{}) error { 	f.lock.Lock() 	defer f.lock.Unlock() 	f.populated = true 	return f.queueActionLocked(Added, obj) } 

可以看到基本上DeltaFIFO所有的操作都有加锁操作,所以都是并发安全的。

3.1.1 DeltaFIFO.queueActionLocked

queueActionLocked负责操作DeltaFIFO中的queue与Deltas,根据对象key构造新的Delta追加到对应的Deltas中,主要逻辑:
(1)计算出对象的key;
(2)构造新的Delta,将新的Delta追加到Deltas末尾;
(3)调用dedupDeltas将Delta去重(目前只将Deltas最末尾的两个delete类型的Delta去重);
(4)判断对象的key是否在queue中,不在则添加入queue中;
(5)根据对象key更新items中的Deltas;
(6)通知所有的消费者解除阻塞;

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {     //(1)计算出对象的key 	id, err := f.KeyOf(obj) 	if err != nil { 		return KeyError{obj, err} 	}     //(2)构造新的Delta,将新的Delta追加到Deltas末尾 	newDeltas := append(f.items[id], Delta{actionType, obj}) 	//(3)调用dedupDeltas将Delta去重(目前只将Deltas最末尾的两个delete类型的Delta去重) 	newDeltas = dedupDeltas(newDeltas)  	if len(newDeltas) > 0 { 	    //(4)判断对象的key是否在queue中,不在则添加入queue中 		if _, exists := f.items[id]; !exists { 			f.queue = append(f.queue, id) 		} 		//(5)根据对象key更新items中的Deltas 		f.items[id] = newDeltas 		//(6)通知所有的消费者解除阻塞 		f.cond.Broadcast() 	} else { 		// We need to remove this from our map (extra items in the queue are 		// ignored if they are not in the map). 		delete(f.items, id) 	} 	return nil } 

3.2 DeltaFIFO.Update

DeltaFIFO的Update操作,主要逻辑:
(1)加锁;
(2)调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Updated类型的新Delta追加到相应的Deltas中;
(3)释放锁。

func (f *DeltaFIFO) Update(obj interface{}) error { 	f.lock.Lock() 	defer f.lock.Unlock() 	f.populated = true 	return f.queueActionLocked(Updated, obj) } 

3.3 DeltaFIFO.Delete

DeltaFIFO的Delete操作,主要逻辑:
(1)计算出对象的key;
(2)加锁;
(3)items中不存在对象key,则直接return,跳过处理;
(4)调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中;
(5)释放锁。

func (f *DeltaFIFO) Delete(obj interface{}) error { 	id, err := f.KeyOf(obj) 	if err != nil { 		return KeyError{obj, err} 	} 	f.lock.Lock() 	defer f.lock.Unlock() 	f.populated = true 	// informer的用法中,f.knownObjects不为nil 	if f.knownObjects == nil { 		if _, exists := f.items[id]; !exists { 			// Presumably, this was deleted when a relist happened. 			// Don't provide a second report of the same deletion. 			return nil 		} 	} else { 		// We only want to skip the "deletion" action if the object doesn't 		// exist in knownObjects and it doesn't have corresponding item in items. 		// Note that even if there is a "deletion" action in items, we can ignore it, 		// because it will be deduped automatically in "queueActionLocked" 		_, exists, err := f.knownObjects.GetByKey(id) 		_, itemsExist := f.items[id] 		if err == nil && !exists && !itemsExist { 			// Presumably, this was deleted when a relist happened. 			// Don't provide a second report of the same deletion. 			return nil 		} 	}  	return f.queueActionLocked(Deleted, obj) } 

3.4 DeltaFIFO.Replace

DeltaFIFO的Replace操作,主要逻辑:
(1)加锁;
(2)遍历list,计算对象的key,循环调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Sync类型的新Delta追加到相应的Deltas中;
(3)对比DeltaFIFO中的items与Replace方法的list,如果DeltaFIFO中的items有,但传进来Replace方法的list中没有某个key,则调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中(避免重复,使用DeletedFinalStateUnknown包装对象);
(4)释放锁;

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {     //(1)加锁 	f.lock.Lock() 	//(4)释放锁 	defer f.lock.Unlock() 	keys := make(sets.String, len(list))      //(2)遍历list,计算对象的key,循环调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Sync类型的新Delta追加到相应的Deltas中 	for _, item := range list { 		key, err := f.KeyOf(item) 		if err != nil { 			return KeyError{item, err} 		} 		keys.Insert(key) 		if err := f.queueActionLocked(Sync, item); err != nil { 			return fmt.Errorf("couldn't enqueue object: %v", err) 		} 	}     // informer的用法中,f.knownObjects不为nil 	if f.knownObjects == nil { 		// Do deletion detection against our own list. 		queuedDeletions := 0 		for k, oldItem := range f.items { 			if keys.Has(k) { 				continue 			} 			var deletedObj interface{} 			if n := oldItem.Newest(); n != nil { 				deletedObj = n.Object 			} 			queuedDeletions++ 			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { 				return err 			} 		}          		if !f.populated { 			f.populated = true 			// While there shouldn't be any queued deletions in the initial 			// population of the queue, it's better to be on the safe side. 			f.initialPopulationCount = len(list) + queuedDeletions 		}  		return nil 	}          //(3)找出DeltaFIFO中的items有,但传进来Replace方法的list中没有的key,调用f.queueActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中(避免重复,使用DeletedFinalStateUnknown包装对象) 	// Detect deletions not already in the queue. 	knownKeys := f.knownObjects.ListKeys() 	queuedDeletions := 0 	for _, k := range knownKeys { 		if keys.Has(k) { 			continue 		}  		deletedObj, exists, err := f.knownObjects.GetByKey(k) 		if err != nil { 			deletedObj = nil 			klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) 		} else if !exists { 			deletedObj = nil 			klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) 		} 		queuedDeletions++ 		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { 			return err 		} 	}          // 第一次调用Replace方法后,populated值为true 	if !f.populated { 		f.populated = true 		// initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的items数量 		f.initialPopulationCount = len(list) + queuedDeletions 	}  	return nil } 

3.5 DeltaFIFO.Pop

DeltaFIFO的Pop操作,queue为空时会阻塞,直至非空,主要逻辑:
(1)加锁;
(2)循环判断queue的长度是否为0,为0则阻塞住,调用f.cond.Wait(),等待通知(与queueActionLocked方法中的f.cond.Broadcast()相对应,即queue中有对象key则发起通知);
(3)取出queue的队头对象key;
(4)更新queue,把queue中所有的对象key前移,相当于把第一个对象key给pop出去;
(5)initialPopulationCount变量减1,当减到0时则说明initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的对象key已经被pop完成;
(6)根据对象key从items中获取Deltas;
(7)把Deltas从items中删除;
(8)调用PopProcessFunc处理获取到的Deltas;
(9)释放锁。

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {     //(1)加锁 	f.lock.Lock() 	//(9)释放锁 	defer f.lock.Unlock() 	//(2)循环判断queue的长度是否为0,为0则阻塞住,调用f.cond.Wait(),等待通知(与queueActionLocked方法中的f.cond.Broadcast()相对应,即queue中有对象key则发起通知) 	for { 		for len(f.queue) == 0 { 			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued. 			// When Close() is called, the f.closed is set and the condition is broadcasted. 			// Which causes this loop to continue and return from the Pop(). 			if f.IsClosed() { 				return nil, ErrFIFOClosed 			}  			f.cond.Wait() 		} 		//(3)取出queue的队头对象key 		id := f.queue[0] 		//(4)更新queue,把queue中所有的对象key前移,相当于把第一个对象key给pop出去 		f.queue = f.queue[1:] 		//(5)initialPopulationCount变量减1,当减到0时则说明initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的对象key已经被pop完成 		if f.initialPopulationCount > 0 { 			f.initialPopulationCount-- 		} 		//(6)根据对象key从items中获取对象 		item, ok := f.items[id] 		if !ok { 			// Item may have been deleted subsequently. 			continue 		} 		//(7)把对象从items中删除 		delete(f.items, id) 		//(8)调用PopProcessFunc处理pop出来的对象 		err := process(item) 		if e, ok := err.(ErrRequeue); ok { 			f.addIfNotPresent(id, item) 			err = e.Err 		} 		// Don't need to copyDeltas here, because we're transferring 		// ownership to the caller. 		return item, err 	} } 

3.6 DeltaFIFO.HasSynced

HasSynced从字面意思上看代表是否同步完成,是否同步完成其实是指第一次从kube-apiserver中获取到的全量的对象是否全部从DeltaFIFO中pop完成,全部pop完成,说明list回来的对象已经全部同步到了Indexer缓存中去了。

方法是否返回true是根据populated和initialPopulationCount两个变量来判断的,当且仅当populated为true且initialPopulationCount 为0的时候方法返回true,否则返回false。

populated属性值在第一次调用DeltaFIFO的Replace方法中就已经将其值设置为true。

而initialPopulationCount的值在第一次调用DeltaFIFO的Replace方法中设置值为加入到items中的Deltas的数量,然后每pop一个Deltas,则initialPopulationCount的值减1,pop完成时值则为0。

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go func (f *DeltaFIFO) HasSynced() bool { 	f.lock.Lock() 	defer f.lock.Unlock() 	return f.populated && f.initialPopulationCount == 0 } 

在前面做informer的初始化与启动分析时也提到过,DeltaFIFO.HasSynced方法的调用链如下:

sharedIndexInformer.WaitForCacheSync --> cache.WaitForCacheSync --> sharedIndexInformer.controller.HasSynced --> controller.config.Queue.HasSynced --> DeltaFIFO.HasSynced

至此DeltaFIFO的分析就结束了,最后来总结一下。

总结

DeltaFIFO核心处理方法

Reflector调用的r.store.Replacer.store.Addr.store.Updater.store.Delete方法其实就是DeltaFIFO的Replace、Add、Update、Delete方法。

(1)DeltaFIFO.Replace:构造Sync类型的Delta加入DeltaFIFO中,此外还会对比DeltaFIFO中的items与Replace方法的list,如果DeltaFIFO中的items有,但传进来Replace方法的list中没有某个key,则构造Deleted类型的Delta加入DeltaFIFO中;
(2)DeltaFIFO.Add:构建Added类型的Delta加入DeltaFIFO中;
(3)DeltaFIFO.Update:构建Updated类型的Delta加入DeltaFIFO中;
(4)DeltaFIFO.Delete:构建Deleted类型的Delta加入DeltaFIFO中;
(5)DeltaFIFO.Pop:从DeltaFIFO的queue中pop出队头key,从map中取出key对应的Deltas返回,并把该key:Deltas从map中移除;
(6)DeltaFIFO.HasSynced:返回true代表同步完成,是否同步完成指第一次从kube-apiserver中获取到的全量的对象是否全部从DeltaFIFO中pop完成,全部pop完成,说明list回来的对象已经全部同步到了Indexer缓存中去了;

informer架构中的DeltaFIFO

k8s client-go源码分析 informer源码分析(4)-DeltaFIFO源码分析

在对informer中的DeltaFIFO分析完之后,接下来将分析informer中的Controller与Processor。

发表评论

相关文章