k8s驱逐篇(7)-kube-controller-manager驱逐-taintManager源码分析

概述

taintManager的主要功能为:当某个node被打上NoExecute污点后,其上面的pod如果不能容忍该污点,则taintManager将会驱逐这些pod,而新建的pod也需要容忍该污点才能调度到该node上;

通过kcm启动参数--enable-taint-manager来确定是否启动taintManagertrue时启动(启动参数默认值为true);

kcm启动参数--feature-gates=TaintBasedEvictions=xxx,默认值true,配合--enable-taint-manager共同作用,两者均为true,才会开启污点驱逐;

kcm污点驱逐

当node出现NoExecute污点时,判断node上的pod是否能容忍node的污点,不能容忍的pod,会被立即删除,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,pod被删除;

源码分析

1.结构体分析

1.1 NoExecuteTaintManager结构体分析

NoExecuteTaintManager结构体为taintManager的主要结构体,其主要属性有:
(1)taintEvictionQueue:不能容忍node上NoExecute的污点的pod,会被加入到该队列中,然后pod会被删除;
(2)taintedNodes:记录了每个node的taint;
(3)nodeUpdateQueue:当node对象发生add、delete、update(新旧node对象的taint不相同)事件时,node会进入该队列;
(4)podUpdateQueue:当pod对象发生add、delete、update(新旧pod对象的NodeNameTolerations不相同)事件时,pod会进入该队列;
(5)nodeUpdateChannelsnodeUpdateChannels即8个nodeUpdateItem类型的channel,有worker负责消费nodeUpdateQueue队列,然后根据node name计算出index,把node放入其中1个nodeUpdateItem类型的channel中;
(6)podUpdateChannelspodUpdateChannels即8个podUpdateItem类型的channel,有worker负责消费podUpdateQueue队列,然后根据pod的node name计算出index,把pod放入其中1个podUpdateItem类型的channel中;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go type NoExecuteTaintManager struct { 	client                clientset.Interface 	recorder              record.EventRecorder 	getPod                GetPodFunc 	getNode               GetNodeFunc 	getPodsAssignedToNode GetPodsByNodeNameFunc  	taintEvictionQueue *TimedWorkerQueue 	// keeps a map from nodeName to all noExecute taints on that Node 	taintedNodesLock sync.Mutex 	taintedNodes     map[string][]v1.Taint  	nodeUpdateChannels []chan nodeUpdateItem 	podUpdateChannels  []chan podUpdateItem  	nodeUpdateQueue workqueue.Interface 	podUpdateQueue  workqueue.Interface } 

1.2 taintEvictionQueue分析

taintEvictionQueue属性是一个TimedWorkerQueue类型的队列,调用tc.taintEvictionQueue.AddWork,会将pod添加到该队列中,会添加一个定时器,然后到期之后会自动执行workFunc,初始化taintEvictionQueue时,传入的workFuncdeletePodHandler函数,作用是删除pod;

所以进入taintEvictionQueue中的pod,会在设置好的时间,被删除;

1.3 pod.Spec.Tolerations分析

pod.Spec.Tolerations配置的是pod的污点容忍信息;

// vendor/k8s.io/api/core/v1/types.go type Toleration struct { 	Key string `json:"key,omitempty" protobuf:"bytes,1,opt,name=key"` 	Operator TolerationOperator `json:"operator,omitempty" protobuf:"bytes,2,opt,name=operator,casttype=TolerationOperator"` 	Value string `json:"value,omitempty" protobuf:"bytes,3,opt,name=value"` 	Effect TaintEffect `json:"effect,omitempty" protobuf:"bytes,4,opt,name=effect,casttype=TaintEffect"` 	TolerationSeconds *int64 `json:"tolerationSeconds,omitempty" protobuf:"varint,5,opt,name=tolerationSeconds"` } 

Tolerations的属性值解析如下:
(1)Key:匹配node污点的Key;
(2)Operator:表示Tolerations中Key与node污点的Key相同时,其Value与node污点的Value的关系,默认值Equal,代表相等,Exists则代表Tolerations中Key与node污点的Key相同即可,不用比较其Value值;
(3)Value:匹配node污点的Value;
(4)Effect:匹配node污点的Effect;
(5)TolerationSeconds:node污点容忍时间;

配置示例:

tolerations: - key: "key1"   operator: "Equal"   value: "value1"   effect: "NoExecute"   tolerationSeconds: 3600 

上述配置表示如果该pod正在运行,同时一个匹配的污点被添加到其所在的node节点上,那么该pod还将继续在节点上运行3600秒,然后会被驱逐(如果在此之前其匹配的node污点被删除了,则该pod不会被驱逐);

2.初始化分析

2.1 NewNodeLifecycleController

NewNodeLifecycleControllerNodeLifecycleController的初始化函数,里面给taintManager注册了pod与node的EventHandlerAddUpdateDelete事件都会调用taintManagerPodUpdatedNodeUpdated方法来做处理;

// pkg/controller/nodelifecycle/node_lifecycle_controller.go func NewNodeLifecycleController(     ...     podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 		AddFunc: func(obj interface{}) { 			... 			if nc.taintManager != nil { 				nc.taintManager.PodUpdated(nil, pod) 			} 		}, 		UpdateFunc: func(prev, obj interface{}) { 			... 			if nc.taintManager != nil { 				nc.taintManager.PodUpdated(prevPod, newPod) 			} 		}, 		DeleteFunc: func(obj interface{}) { 			... 			if nc.taintManager != nil { 				nc.taintManager.PodUpdated(pod, nil) 			} 		}, 	})     ...     if nc.runTaintManager { 		podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) } 		nodeLister := nodeInformer.Lister() 		nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) } 		nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode) 		nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 			AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { 				nc.taintManager.NodeUpdated(nil, node) 				return nil 			}), 			UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error { 				nc.taintManager.NodeUpdated(oldNode, newNode) 				return nil 			}), 			DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error { 				nc.taintManager.NodeUpdated(node, nil) 				return nil 			}), 		}) 	} 	... } 

2.1.1 tc.NodeUpdated

tc.NodeUpdated方法会判断新旧node对象的taint是否相同,不相同则调用tc.nodeUpdateQueue.Add,将该node放入到nodeUpdateQueue队列中;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) { 	nodeName := "" 	oldTaints := []v1.Taint{} 	if oldNode != nil { 		nodeName = oldNode.Name 		oldTaints = getNoExecuteTaints(oldNode.Spec.Taints) 	}  	newTaints := []v1.Taint{} 	if newNode != nil { 		nodeName = newNode.Name 		newTaints = getNoExecuteTaints(newNode.Spec.Taints) 	}  	if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) { 		return 	} 	updateItem := nodeUpdateItem{ 		nodeName: nodeName, 	}  	tc.nodeUpdateQueue.Add(updateItem) } 

2.1.2 tc.PodUpdated

tc.PodUpdated方法会判断新旧pod对象的NodeNameTolerations是否相同,不相同则调用tc.podUpdateQueue.Add,将该pod放入到podUpdateQueue队列中;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) { 	podName := "" 	podNamespace := "" 	nodeName := "" 	oldTolerations := []v1.Toleration{} 	if oldPod != nil { 		podName = oldPod.Name 		podNamespace = oldPod.Namespace 		nodeName = oldPod.Spec.NodeName 		oldTolerations = oldPod.Spec.Tolerations 	} 	newTolerations := []v1.Toleration{} 	if newPod != nil { 		podName = newPod.Name 		podNamespace = newPod.Namespace 		nodeName = newPod.Spec.NodeName 		newTolerations = newPod.Spec.Tolerations 	}  	if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName { 		return 	} 	updateItem := podUpdateItem{ 		podName:      podName, 		podNamespace: podNamespace, 		nodeName:     nodeName, 	}  	tc.podUpdateQueue.Add(updateItem) } 

2.2 taintEvictionQueue

看到TaintManager的初始化方法NewNoExecuteTaintManager中,调用CreateWorkerQueuetaintEvictionQueue做了初始化;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go func NewNoExecuteTaintManager(...) ... {     ...     tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))     ... } 

CreateWorkerQueue函数初始化并返回TimedWorkerQueue结构体;

// pkg/controller/nodelifecycle/scheduler/timed_workers.go func CreateWorkerQueue(f func(args *WorkArgs) error) *TimedWorkerQueue { 	return &TimedWorkerQueue{ 		workers:  make(map[string]*TimedWorker), 		workFunc: f, 	} } 

2.2.1 deletePodHandler

初始化taintEvictionQueue时传入了deletePodHandler作为队列中元素的处理方法;deletePodHandler函数的主要逻辑是请求apiserver,删除pod对象,所以说,被放入到taintEvictionQueue队列中的pod,会被删除;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error { 	return func(args *WorkArgs) error { 		ns := args.NamespacedName.Namespace 		name := args.NamespacedName.Name 		klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String()) 		if emitEventFunc != nil { 			emitEventFunc(args.NamespacedName) 		} 		var err error 		for i := 0; i < retries; i++ { 			err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{}) 			if err == nil { 				break 			} 			time.Sleep(10 * time.Millisecond) 		} 		return err 	} } 

2.2.2 tc.taintEvictionQueue.AddWork

再来看一下tc.taintEvictionQueue.AddWork方法,作用是添加pod进入taintEvictionQueue队列,即调用CreateWorker给该pod创建一个worker来删除该pod;

// pkg/controller/nodelifecycle/scheduler/timed_workers.go func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) { 	key := args.KeyFromWorkArgs() 	klog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v", key, createdAt, fireAt)  	q.Lock() 	defer q.Unlock() 	if _, exists := q.workers[key]; exists { 		klog.Warningf("Trying to add already existing work for %+v. Skipping.", args) 		return 	} 	worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key)) 	q.workers[key] = worker } 

CreateWorker函数会先判断是否应该立即执行workFunc,是的话立即拉起一个goroutine来执行workFunc并返回,否则定义一个timer定时器,到时间后自动拉起一个goroutine执行workFunc

// pkg/controller/nodelifecycle/scheduler/timed_workers.go func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker { 	delay := fireAt.Sub(createdAt) 	if delay <= 0 { 		go f(args) 		return nil 	} 	timer := time.AfterFunc(delay, func() { f(args) }) 	return &TimedWorker{ 		WorkItem:  args, 		CreatedAt: createdAt, 		FireAt:    fireAt, 		Timer:     timer, 	} } 

2.2.3 tc.taintEvictionQueue.Cancel

tc.taintEvictionQueue.AddWork方法,作用是停止对应的pod的timer,即停止执行对应pod的workFunc(不删除pod);

// pkg/controller/nodelifecycle/scheduler/timed_workers.go func (w *TimedWorker) Cancel() { 	if w != nil { 		w.Timer.Stop() 	} } 

3.核心处理逻辑分析

nc.taintManager.Run

nc.taintManager.RuntaintManager的启动方法,处理逻辑都在这,主要是判断node上的pod是否能容忍node的NoExecute污点,不能容忍的pod,会被删除,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,被删除;

主要逻辑:
(1)创建8个类型为nodeUpdateItem的channel(缓冲区大小10),并赋值给tc.nodeUpdateChannels
创建8个类型为podUpdateItem的channel(缓冲区大小1),并赋值给podUpdateChannels

(2)消费tc.nodeUpdateQueue队列,根据node name计算hash,将node放入对应的tc.nodeUpdateChannels[hash]中;

(3)消费tc.podUpdateQueue队列,根据pod的node name计算hash,将node放入对应的tc.podUpdateChannels[hash]中;

(4)启动8个goroutine,调用tc.worker对其中一个tc.nodeUpdateChannelstc.podUpdateChannels做处理,判断node上的pod是否能容忍node的NoExecute污点,不能容忍的pod,会被删除,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,被删除;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { 	klog.V(0).Infof("Starting NoExecuteTaintManager")  	for i := 0; i < UpdateWorkerSize; i++ { 		tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize)) 		tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize)) 	}  	// Functions that are responsible for taking work items out of the workqueues and putting them 	// into channels. 	go func(stopCh <-chan struct{}) { 		for { 			item, shutdown := tc.nodeUpdateQueue.Get() 			if shutdown { 				break 			} 			nodeUpdate := item.(nodeUpdateItem) 			hash := hash(nodeUpdate.nodeName, UpdateWorkerSize) 			select { 			case <-stopCh: 				tc.nodeUpdateQueue.Done(item) 				return 			case tc.nodeUpdateChannels[hash] <- nodeUpdate: 				// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker 			} 		} 	}(stopCh)  	go func(stopCh <-chan struct{}) { 		for { 			item, shutdown := tc.podUpdateQueue.Get() 			if shutdown { 				break 			} 			// The fact that pods are processed by the same worker as nodes is used to avoid races 			// between node worker setting tc.taintedNodes and pod worker reading this to decide 			// whether to delete pod. 			// It's possible that even without this assumption this code is still correct. 			podUpdate := item.(podUpdateItem) 			hash := hash(podUpdate.nodeName, UpdateWorkerSize) 			select { 			case <-stopCh: 				tc.podUpdateQueue.Done(item) 				return 			case tc.podUpdateChannels[hash] <- podUpdate: 				// tc.podUpdateQueue.Done is called by the podUpdateChannels worker 			} 		} 	}(stopCh)  	wg := sync.WaitGroup{} 	wg.Add(UpdateWorkerSize) 	for i := 0; i < UpdateWorkerSize; i++ { 		go tc.worker(i, wg.Done, stopCh) 	} 	wg.Wait() } 

tc.worker

tc.worker方法负责消费nodeUpdateChannelspodUpdateChannels,分别调用tc.handleNodeUpdatetc.handlePodUpdate方法做进一步处理;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) { 	defer done()  	// When processing events we want to prioritize Node updates over Pod updates, 	// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible - 	// we don't want user (or system) to wait until PodUpdate queue is drained before it can 	// start evicting Pods from tainted Nodes. 	for { 		select { 		case <-stopCh: 			return 		case nodeUpdate := <-tc.nodeUpdateChannels[worker]: 			tc.handleNodeUpdate(nodeUpdate) 			tc.nodeUpdateQueue.Done(nodeUpdate) 		case podUpdate := <-tc.podUpdateChannels[worker]: 			// If we found a Pod update we need to empty Node queue first. 		priority: 			for { 				select { 				case nodeUpdate := <-tc.nodeUpdateChannels[worker]: 					tc.handleNodeUpdate(nodeUpdate) 					tc.nodeUpdateQueue.Done(nodeUpdate) 				default: 					break priority 				} 			} 			// After Node queue is emptied we process podUpdate. 			tc.handlePodUpdate(podUpdate) 			tc.podUpdateQueue.Done(podUpdate) 		} 	} } 

3.1 tc.handleNodeUpdate

tc.handleNodeUpdate方法主要是判断node上的pod是否能容忍node的NoExecute污点,不能容忍的pod,会被删除,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,被删除;

主要逻辑:
(1)从informer本地缓存中获取node对象;
(2)从node.Spec.Taints中获取NoExecutetaints
(3)将该node的NoExecutetaints更新到tc.taintedNodes中;
(4)调用tc.getPodsAssignedToNode,获取该node上的所有pod,如果pod数量为0,直接return;
(5)如果node的NoExecutetaints数量为0,则遍历该node上所有pod,调用tc.cancelWorkWithEvent,将该pod从taintEvictionQueue队列中移除,然后直接return;
(6)遍历该node上所有pod,调用tc.processPodOnNode,对pod做进一步处理;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) { 	node, err := tc.getNode(nodeUpdate.nodeName) 	if err != nil { 		if apierrors.IsNotFound(err) { 			// Delete 			klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName) 			tc.taintedNodesLock.Lock() 			defer tc.taintedNodesLock.Unlock() 			delete(tc.taintedNodes, nodeUpdate.nodeName) 			return 		} 		utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err)) 		return 	}  	// Create or Update 	klog.V(4).Infof("Noticed node update: %#v", nodeUpdate) 	taints := getNoExecuteTaints(node.Spec.Taints) 	func() { 		tc.taintedNodesLock.Lock() 		defer tc.taintedNodesLock.Unlock() 		klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints) 		if len(taints) == 0 { 			delete(tc.taintedNodes, node.Name) 		} else { 			tc.taintedNodes[node.Name] = taints 		} 	}()  	// This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode: 	// getPodsAssignedToNode can be delayed as long as all future updates to pods will call 	// tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods. 	pods, err := tc.getPodsAssignedToNode(node.Name) 	if err != nil { 		klog.Errorf(err.Error()) 		return 	} 	if len(pods) == 0 { 		return 	} 	// Short circuit, to make this controller a bit faster. 	if len(taints) == 0 { 		klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name) 		for i := range pods { 			tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name}) 		} 		return 	}  	now := time.Now() 	for _, pod := range pods { 		podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} 		tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now) 	} } 

3.1.1 tc.processPodOnNode

tc.processPodOnNode方法主要作用是判断pod是否能容忍node上所有的NoExecute的污点,如果不能,则将该pod加到taintEvictionQueue队列中,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,加到taintEvictionQueue队列中;

主要逻辑:
(1)如果node的NoExecutetaints数量为0,则调用tc.cancelWorkWithEvent,将该pod从taintEvictionQueue队列中移除;
(2)调用v1helper.GetMatchingTolerations,判断pod是否容忍node上所有的NoExecute的taints,以及获取能容忍taints的容忍列表;
(3)如果不能容忍所有污点,则调用tc.taintEvictionQueue.AddWork,将该pod加到taintEvictionQueue队列中;
(4)如果能容忍所有污点,则等待所有污点的容忍时间里最小值后,再调用tc.taintEvictionQueue.AddWork,将该pod加到taintEvictionQueue队列中;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) processPodOnNode( 	podNamespacedName types.NamespacedName, 	nodeName string, 	tolerations []v1.Toleration, 	taints []v1.Taint, 	now time.Time, ) { 	if len(taints) == 0 { 		tc.cancelWorkWithEvent(podNamespacedName) 	} 	allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations) 	if !allTolerated { 		klog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName) 		// We're canceling scheduled work (if any), as we're going to delete the Pod right away. 		tc.cancelWorkWithEvent(podNamespacedName) 		tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now()) 		return 	} 	minTolerationTime := getMinTolerationTime(usedTolerations) 	// getMinTolerationTime returns negative value to denote infinite toleration. 	if minTolerationTime < 0 { 		klog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String()) 		return 	}  	startTime := now 	triggerTime := startTime.Add(minTolerationTime) 	scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) 	if scheduledEviction != nil { 		startTime = scheduledEviction.CreatedAt 		if startTime.Add(minTolerationTime).Before(triggerTime) { 			return 		} 		tc.cancelWorkWithEvent(podNamespacedName) 	} 	tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime) } 

3.2 tc.handlePodUpdate

tc.handlePodUpdate方法最终也是调用了tc.processPodOnNode对pod做进一步处理;

tc.processPodOnNode方法在上面已经分析过了,这里不再进行分析;

主要逻辑:
(1)从informer本地缓存中获取pod对象;
(2)获取pod的node name,如果为空,直接return;
(3)根据node name从tc.taintedNodes中获取node的污点,如果污点为空,直接return;
(4)调用tc.processPodOnNode对pod做进一步处理;

// pkg/controller/nodelifecycle/scheduler/taint_manager.go func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) { 	pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace) 	if err != nil { 		if apierrors.IsNotFound(err) { 			// Delete 			podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName} 			klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName) 			tc.cancelWorkWithEvent(podNamespacedName) 			return 		} 		utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err)) 		return 	}  	// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object. 	if pod.Spec.NodeName != podUpdate.nodeName { 		return 	}  	// Create or Update 	podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} 	klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName) 	nodeName := pod.Spec.NodeName 	if nodeName == "" { 		return 	} 	taints, ok := func() ([]v1.Taint, bool) { 		tc.taintedNodesLock.Lock() 		defer tc.taintedNodesLock.Unlock() 		taints, ok := tc.taintedNodes[nodeName] 		return taints, ok 	}() 	// It's possible that Node was deleted, or Taints were removed before, which triggered 	// eviction cancelling if it was needed. 	if !ok { 		return 	} 	tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now()) } 

总结

taintManager的主要功能为:当某个node被打上NoExecute污点后,其上面的pod如果不能容忍该污点,则taintManager将会驱逐这些pod,而新建的pod也需要容忍该污点才能调度到该node上;

通过kcm启动参数--enable-taint-manager来确定是否启动taintManagertrue时启动(启动参数默认值为true);

kcm启动参数--feature-gates=TaintBasedEvictions=xxx,默认值true,配合--enable-taint-manager共同作用,两者均为true,才会开启污点驱逐;

kcm污点驱逐

当node出现NoExecute污点时,判断node上的pod是否能容忍node的污点,不能容忍的pod,会被立即删除,能容忍所有污点的pod,则等待所有污点的容忍时间里最小值后,pod被删除;

发表评论

评论已关闭。

相关文章