kube-scheduler的调度上下文

Scheduler结构

Scheduler 是整个 kube-scheduler 的一个 structure,提供了 kube-scheduler 运行所需的组件。

type Scheduler struct { 	// Cache是一个抽象,会缓存pod的信息,作为scheduler进行查找,操作是基于Pod进行增加 	Cache internalcache.Cache 	// Extenders 算是调度框架中提供的调度插件,会影响kubernetes中的调度策略 	Extenders []framework.Extender  	// NextPod 作为一个函数提供,会阻塞获取下一个ke'diao'du 	NextPod func() *framework.QueuedPodInfo  	// Error is called if there is an error. It is passed the pod in 	// question, and the error 	Error func(*framework.QueuedPodInfo, error)  	// SchedulePod 尝试将给出的pod调度到Node。 	SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)  	// 关闭scheduler的信号 	StopEverything <-chan struct{}  	// SchedulingQueue保存要调度的Pod 	SchedulingQueue internalqueue.SchedulingQueue  	// Profiles中是多个调度框架 	Profiles profile.Map 	client clientset.Interface 	nodeInfoSnapshot *internalcache.Snapshot 	percentageOfNodesToScore int32 	nextStartNodeIndex int } 

作为实际执行的两个核心,SchedulingQueue ,与 scheduleOne 将会分析到这两个

SchedulingQueue

在知道 kube-scheduler 初始化过程后,需要对 kube-scheduler 的整个 structureworkflow 进行分析

Run 中,运行的是 一个 SchedulingQueue 与 一个 scheduleOne ,从结构上看是属于 Scheduler

func (sched *Scheduler) Run(ctx context.Context) { 	sched.SchedulingQueue.Run()  	// We need to start scheduleOne loop in a dedicated goroutine, 	// because scheduleOne function hangs on getting the next item 	// from the SchedulingQueue. 	// If there are no new pods to schedule, it will be hanging there 	// and if done in this goroutine it will be blocking closing 	// SchedulingQueue, in effect causing a deadlock on shutdown. 	go wait.UntilWithContext(ctx, sched.scheduleOne, 0)  	<-ctx.Done() 	sched.SchedulingQueue.Close() }  

SchedulingQueue 是一个队列的抽象,用于存储等待调度的Pod。该接口遵循类似于 cache.FIFO 和 cache.Heap 的模式。

type SchedulingQueue interface { 	framework.PodNominator 	Add(pod *v1.Pod) error 	// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. 	// The passed-in pods are originally compiled from plugins that want to activate Pods, 	// by injecting the pods through a reserved CycleState struct (PodsToActivate). 	Activate(pods map[string]*v1.Pod) 	// 将不可调度的Pod重入到队列中 	AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error 	// SchedulingCycle returns the current number of scheduling cycle which is 	// cached by scheduling queue. Normally, incrementing this number whenever 	// a pod is popped (e.g. called Pop()) is enough. 	SchedulingCycle() int64 	// Pop会弹出一个pod,并从head优先级队列中删除 	Pop() (*framework.QueuedPodInfo, error) 	Update(oldPod, newPod *v1.Pod) error 	Delete(pod *v1.Pod) error 	MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) 	AssignedPodAdded(pod *v1.Pod) 	AssignedPodUpdated(pod *v1.Pod) 	PendingPods() []*v1.Pod 	// Close closes the SchedulingQueue so that the goroutine which is 	// waiting to pop items can exit gracefully. 	Close() 	// Run starts the goroutines managing the queue. 	Run() } 

PriorityQueueSchedulingQueue 的实现,该部分的核心构成是两个子队列与一个数据结构,即 activeQbackoffQunschedulablePods

  • activeQ:是一个 heap 类型的优先级队列,是 sheduler 从中获得优先级最高的Pod进行调度
  • backoffQ:也是一个 heap 类型的优先级队列,存放的是不可调度的Pod
  • unschedulablePods :保存确定不可被调度的Pod
type SchedulingQueue interface { 	framework.PodNominator 	Add(pod *v1.Pod) error 	// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ. 	// The passed-in pods are originally compiled from plugins that want to activate Pods, 	// by injecting the pods through a reserved CycleState struct (PodsToActivate). 	Activate(pods map[string]*v1.Pod) 	// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. 	// The podSchedulingCycle represents the current scheduling cycle number which can be 	// returned by calling SchedulingCycle(). 	AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error 	// SchedulingCycle returns the current number of scheduling cycle which is 	// cached by scheduling queue. Normally, incrementing this number whenever 	// a pod is popped (e.g. called Pop()) is enough. 	SchedulingCycle() int64 	// Pop removes the head of the queue and returns it. It blocks if the 	// queue is empty and waits until a new item is added to the queue. 	Pop() (*framework.QueuedPodInfo, error) 	Update(oldPod, newPod *v1.Pod) error 	Delete(pod *v1.Pod) error 	MoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck) 	AssignedPodAdded(pod *v1.Pod) 	AssignedPodUpdated(pod *v1.Pod) 	PendingPods() []*v1.Pod 	// Close closes the SchedulingQueue so that the goroutine which is 	// waiting to pop items can exit gracefully. 	Close() 	// Run starts the goroutines managing the queue. 	Run() } 

在New scheduler 时可以看到会初始化这个queue

podQueue := internalqueue.NewSchedulingQueue(     // 实现pod对比的一个函数即less     profiles[options.profiles[0].SchedulerName].QueueSortFunc(),     informerFactory,     internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),     internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),     internalqueue.WithPodNominator(nominator),     internalqueue.WithClusterEventMap(clusterEventMap),     internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), ) 

NewSchedulingQueue 则是初始化这个 PriorityQueue

// NewSchedulingQueue initializes a priority queue as a new scheduling queue. func NewSchedulingQueue( 	lessFn framework.LessFunc, 	informerFactory informers.SharedInformerFactory, 	opts ...Option) SchedulingQueue { 	return NewPriorityQueue(lessFn, informerFactory, opts...) }  // NewPriorityQueue creates a PriorityQueue object. func NewPriorityQueue( 	lessFn framework.LessFunc, 	informerFactory informers.SharedInformerFactory, 	opts ...Option, ) *PriorityQueue { 	options := defaultPriorityQueueOptions 	for _, opt := range opts { 		opt(&options) 	} 	// 这个就是 less函数,作为打分的一部分 	comp := func(podInfo1, podInfo2 interface{}) bool { 		pInfo1 := podInfo1.(*framework.QueuedPodInfo) 		pInfo2 := podInfo2.(*framework.QueuedPodInfo) 		return lessFn(pInfo1, pInfo2) 	}  	if options.podNominator == nil { 		options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister()) 	}  	pq := &PriorityQueue{ 		PodNominator:                      options.podNominator, 		clock:                             options.clock, 		stop:                              make(chan struct{}), 		podInitialBackoffDuration:         options.podInitialBackoffDuration, 		podMaxBackoffDuration:             options.podMaxBackoffDuration, 		podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, 		activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), 		unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()), 		moveRequestCycle:                  -1, 		clusterEventMap:                   options.clusterEventMap, 	} 	pq.cond.L = &pq.lock 	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) 	pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()  	return pq } 

了解了Queue的结构,就需要知道 入队列与出队列是在哪里操作的。在初始化时,需要注册一个 addEventHandlerFuncs 这个时候,会注入三个动作函数,也就是controller中的概念;而在AddFunc中可以看到会入队列。

注入是对 Pod 的informer注入的,注入的函数 addPodToSchedulingQueue 就是入栈

Handler: cache.ResourceEventHandlerFuncs{     AddFunc:    sched.addPodToSchedulingQueue,     UpdateFunc: sched.updatePodInSchedulingQueue,     DeleteFunc: sched.deletePodFromSchedulingQueue, },  func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { 	pod := obj.(*v1.Pod) 	klog.V(3).InfoS("Add event for unscheduled pod", "pod", klog.KObj(pod)) 	if err := sched.SchedulingQueue.Add(pod); err != nil { 		utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) 	} } 

而这个 SchedulingQueue 的实现就是 PriorityQueue ,而Add中则对 activeQ进行的操作

func (p *PriorityQueue) Add(pod *v1.Pod) error { 	p.lock.Lock() 	defer p.lock.Unlock()     // 格式化入栈数据,包含podinfo,里会包含v1.Pod     // 初始化的时间,创建的时间,以及不能被调度时的记录其plugin的名称 	pInfo := p.newQueuedPodInfo(pod)     // 入栈 	if err := p.activeQ.Add(pInfo); err != nil { 		klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pod)) 		return err 	} 	if p.unschedulablePods.get(pod) != nil { 		klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod)) 		p.unschedulablePods.delete(pod) 	} 	// Delete pod from backoffQ if it is backing off 	if err := p.podBackoffQ.Delete(pInfo); err == nil { 		klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod)) 	} 	metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() 	p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) 	p.cond.Broadcast()  	return nil } 

在上面看 scheduler 结构时,可以看到有一个 nextPod的,nextPod就是从队列中弹出一个pod,这个在scheduler 时会传入 MakeNextPodFunc 就是这个 nextpod

func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo { 	return func() *framework.QueuedPodInfo { 		podInfo, err := queue.Pop() 		if err == nil { 			klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod)) 			for plugin := range podInfo.UnschedulablePlugins { 				metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec() 			} 			return podInfo 		} 		klog.ErrorS(err, "Error while retrieving next pod from scheduling queue") 		return nil 	} } 

而这个 queue.Pop() 对应的就是 PriorityQueuePop() ,在这里会将作为 activeQ 的消费端

func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {    p.lock.Lock()    defer p.lock.Unlock()    for p.activeQ.Len() == 0 {       // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.       // When Close() is called, the p.closed is set and the condition is broadcast,       // which causes this loop to continue and return from the Pop().       if p.closed {          return nil, fmt.Errorf(queueClosed)       }       p.cond.Wait()    }    obj, err := p.activeQ.Pop()    if err != nil {       return nil, err    }    pInfo := obj.(*framework.QueuedPodInfo)    pInfo.Attempts++    p.schedulingCycle++    return pInfo, nil } 

在上面入口部分也看到了,scheduleOne 和 scheduler,scheduleOne 就是去消费一个Pod,他会调用 NextPod,NextPod就是在初始化传入的 MakeNextPodFunc ,至此回到对应的 Pop来做消费。

schedulerOne是为一个Pod做调度的流程。

func (sched *Scheduler) scheduleOne(ctx context.Context) { 	podInfo := sched.NextPod() 	// pod could be nil when schedulerQueue is closed 	if podInfo == nil || podInfo.Pod == nil { 		return 	} 	pod := podInfo.Pod 	fwk, err := sched.frameworkForPod(pod) 	if err != nil { 		// This shouldn't happen, because we only accept for scheduling the pods 		// which specify a scheduler name that matches one of the profiles. 		klog.ErrorS(err, "Error occurred") 		return 	} 	if sched.skipPodSchedule(fwk, pod) { 		return 	} ... 

调度上下文

kube-scheduler的调度上下文

图1:Pod的调度上下文
Source:https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework

当了解了scheduler结构后,下面分析下调度上下文的过程。看看扩展点是怎么工作的。这个时候又需要提到官网的调度上下文的图。

调度框架 [2]

调度框架 (scheduling framework SF ) 是kubernetes为 scheduler设计的一个pluggable的架构。SF 将scheduler设计为 Plugin 式的 API,API将上一章中提到的一些列调度策略实现为 Plugin

SF 中,定义了一些扩展点 (extension points EP ),而被实现为Plugin的调度程序将被注册在一个或多个 EP 中,换句话来说,在这些 EP 的执行过程中如果注册在多个 EP 中,将会在多个 EP 被调用。

每次调度都分为两个阶段,调度周期(Scheduling Cycel)与绑定周期(Binding Cycle)。

  • SC 表示为,为Pod选择一个节点;SC 是串行运行的。
  • BC 表示为,将 SC 决策结果应用于集群中;BC 可以同时运行。

调度周期与绑定周期结合一起,被称为调度上下文Scheduling Context),下图则是调度上下文的工作流

注:如果决策结果为Pod的调度结果无可用节点,或存在内部错误,则中止 SCBC。Pod将重入队列重试

扩展点 [3]

扩展点(Extension points)是指在调度上下文中的每个可扩展API,通过图提现为[图1]。其中 Filter 相当于 PredicateScoring 相当于 Priority

对于调度阶段会通过以下扩展点:

  • Sort:该插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。

  • preFilter:该插件用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度

  • filter:该插件相当于调度上下文中的 Predicates,用于排除不能运行 Pod 的节点。Filter 会按配置的顺序进行调用。如果有一个filter将节点标记位不可用,则将 Pod 标记为不可调度(即不会向下执行)。

  • postFilter:当没有为 pod 找到FN时,该插件会按照配置的顺序进行调用。如果任何postFilter插件将 Pod 标记为schedulable,则不会调用其余插件。即 filter 成功后不会进行这步骤

  • preScore:可用于进行预Score工作(通知性的扩展点)。

  • score:该插件为每个通过 filter 阶段的Node提供打分服务。然后Scheduler将选择具有最高加权分数总和的Node。

  • reserve:因为绑定事件时异步发生的,该插件是为了避免Pod在绑定到节点前时,调度到新的Pod,使节点使用资源超过可用资源情况。如果后续阶段发生错误或失败,将触发 UnReserve 回滚(通知性扩展点)。这也是作为调度周期中最后一个状态,要么成功到 postBind ,要么失败触发 UnReserve

  • permit:该插件可以阻止或延迟 Pod 的绑定,一般情况下这步骤会做三件事:

    • appove :调度器继续绑定过程
    • Deny:如果任何一个Premit拒绝了Pod与节点的绑定,那么将触发 UnReserve ,并重入队列
    • Wait: 如果 Permit 插件返回 Wait,该 Pod 将保留在内部 Wait Pod 列表中,直到被 Appove。如果发生超时,wait 变为 deny ,将Pod放回至调度队列中,并触发 Unreserve 回滚 。
  • preBind:该插件用于在 bind Pod 之前执行所需的前置工作。如,preBind 可能会提供一个网络卷并将其挂载到目标节点上。如果在该步骤中的任意插件返回错误,则Pod 将被 deny 并放置到调度队列中。

  • bind:在所有的 preBind 完成后,该插件将用于将Pod绑定到Node,并按顺序调用绑定该步骤的插件。如果有一个插件处理了这个事件,那么则忽略其余所有插件。

  • postBind:该插件在绑定 Pod 后调用,可用于清理相关资源(通知性的扩展点)。

  • multiPoint:这是一个仅配置字段,允许同时为所有适用的扩展点启用或禁用插件。

scheduler 对于调度上下文在代码中的实现就是 scheduleOne ,下面就是看这个调度上下文

Sort

Sort 插件提供了排序功能,用于对在调度队列中待处理 Pod 进行排序。一次只能启用一个队列排序。

在进入 scheduleOne 后,NextPodactiveQ 中队列中得到一个Pod,然后的 frameworkForPod 会做打分的动作就是调度上下文的第一个扩展点 sort

func (sched *Scheduler) scheduleOne(ctx context.Context) { 	podInfo := sched.NextPod() 	// pod could be nil when schedulerQueue is closed 	if podInfo == nil || podInfo.Pod == nil { 		return 	} 	pod := podInfo.Pod 	fwk, err := sched.frameworkForPod(pod) ...      func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {     // 获取指定的profile 	fwk, ok := sched.Profiles[pod.Spec.SchedulerName] 	if !ok { 		return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName) 	} 	return fwk, nil } 

回顾,因为在New scheduler时会初始化这个 sort 函数

podQueue := internalqueue.NewSchedulingQueue(     profiles[options.profiles[0].SchedulerName].QueueSortFunc(),     informerFactory,     internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),     internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),     internalqueue.WithPodNominator(nominator),     internalqueue.WithClusterEventMap(clusterEventMap),     internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), ) 

preFilter

preFilter作为第一个扩展点,是用于在过滤之前预处理或检查 Pod 或集群的相关信息。这里会终止调度

func (sched *Scheduler) scheduleOne(ctx context.Context) { 	podInfo := sched.NextPod() 	// pod could be nil when schedulerQueue is closed 	if podInfo == nil || podInfo.Pod == nil { 		return 	} 	pod := podInfo.Pod 	fwk, err := sched.frameworkForPod(pod) 	if err != nil { 		// This shouldn't happen, because we only accept for scheduling the pods 		// which specify a scheduler name that matches one of the profiles. 		klog.ErrorS(err, "Error occurred") 		return 	} 	if sched.skipPodSchedule(fwk, pod) { 		return 	}  	klog.V(3).InfoS("Attempting to schedule pod", "pod", klog.KObj(pod))  	// Synchronously attempt to find a fit for the pod. 	start := time.Now() 	state := framework.NewCycleState() 	state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) 	// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty. 	podsToActivate := framework.NewPodsToActivate() 	state.Write(framework.PodsToActivateKey, podsToActivate)  	schedulingCycleCtx, cancel := context.WithCancel(ctx) 	defer cancel()     // 这里将进入prefilter 	scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod) 

schedulePod 尝试将给定的 pod 调度到节点列表中的节点之一。如果成功,它将返回节点的名称。

func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { 	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) 	defer trace.LogIfLong(100 * time.Millisecond) 	// 用于将cache更新为当前内容 	if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil { 		return result, err 	} 	trace.Step("Snapshotting scheduler cache and node infos done")  	if sched.nodeInfoSnapshot.NumNodes() == 0 { 		return result, ErrNoNodesAvailable 	} 	// 找到一个合适的pod时,会执行扩展点 	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod) 	     ... 

findNodesThatFitPod 会执行对应的过滤插件来找到最适合的Node,包括备注,以及方法名都可以看到,这里运行的插件😁😁,后面会分析算法内容,只对workflow学习。

func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) { 	diagnosis := framework.Diagnosis{ 		NodeToStatusMap:      make(framework.NodeToStatusMap), 		UnschedulablePlugins: sets.NewString(), 	}  	// Run "prefilter" plugins. 	preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod) 	allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List() 	if err != nil { 		return nil, diagnosis, err 	} 	if !s.IsSuccess() { 		if !s.IsUnschedulable() { 			return nil, diagnosis, s.AsError() 		} 		// All nodes will have the same status. Some non trivial refactoring is 		// needed to avoid this copy. 		for _, n := range allNodes { 			diagnosis.NodeToStatusMap[n.Node().Name] = s 		} 		// Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins. 		if s.FailedPlugin() != "" { 			diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin()) 		} 		return nil, diagnosis, nil 	}  	// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption. 	// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes. 	if len(pod.Status.NominatedNodeName) > 0 { 		feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis) 		if err != nil { 			klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName) 		} 		// Nominated node passes all the filters, scheduler is good to assign this node to the pod. 		if len(feasibleNodes) != 0 { 			return feasibleNodes, diagnosis, nil 		} 	}  	nodes := allNodes 	if !preRes.AllNodes() { 		nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames)) 		for n := range preRes.NodeNames { 			nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n) 			if err != nil { 				return nil, diagnosis, err 			} 			nodes = append(nodes, nInfo) 		} 	} 	feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes) 	if err != nil { 		return nil, diagnosis, err 	}  	feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) 	if err != nil { 		return nil, diagnosis, err 	} 	return feasibleNodes, diagnosis, nil } 

filter

filter插件相当于调度上下文中的 Predicates,用于排除不能运行 Pod 的节点。Filter 会按配置的顺序进行调用。如果有一个filter将节点标记位不可用,则将 Pod 标记为不可调度(即不会向下执行)。

对于代码中来讲,filter还是处于 findNodesThatFitPod 函数中,findNodesThatPassFilters 就是获取到 FN,即可行节点,而这个过程就是 filter 扩展点

func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) { 	...      	feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, nodes) 	if err != nil { 		return nil, diagnosis, err 	}  	feasibleNodes, err = findNodesThatPassExtenders(sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap) 	if err != nil { 		return nil, diagnosis, err 	} 	return feasibleNodes, diagnosis, nil } 

Postfilter

当没有为 pod 找到FN时,该插件会按照配置的顺序进行调用。如果任何postFilter插件将 Pod 标记为schedulable,则不会调用其余插件。即 filter 成功后不会进行这步骤,那我们来验证下这里把😊

还是在 scheduleOne 中,当我们运行的 SchedulePod 完成后(成功或失败),这时会返回一个err,而 postfilter 会根据这个 err进行选择执行或不执行,符合官方给出的说法。

scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod) 	if err != nil { 		// SchedulePod() may have failed because the pod would not fit on any host, so we try to 		// preempt, with the expectation that the next time the pod is tried for scheduling it 		// will fit due to the preemption. It is also possible that a different pod will schedule 		// into the resources that were preempted, but this is harmless. 		var nominatingInfo *framework.NominatingInfo 		if fitError, ok := err.(*framework.FitError); ok { 			if !fwk.HasPostFilterPlugins() { 				klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed") 			} else { 				// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle. 				result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap) 				if status.Code() == framework.Error { 					klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status) 				} else { 					fitError.Diagnosis.PostFilterMsg = status.Message() 					klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status) 				} 				if result != nil { 					nominatingInfo = result.NominatingInfo 				} 			} 			// Pod did not fit anywhere, so it is counted as a failure. If preemption 			// succeeds, the pod should get counted as a success the next time we try to 			// schedule it. (hopefully) 			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) 		} else if err == ErrNoNodesAvailable { 			nominatingInfo = clearNominatedNode 			// No nodes available is counted as unschedulable rather than an error. 			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) 		} else { 			nominatingInfo = clearNominatedNode 			klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) 			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) 		} 		sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) 		return 	} 

PreScore,Score

可用于进行预Score工作,作为通知性的扩展点,会在在filter完之后直接会关联 preScore 插件进行继续工作,而不是返回,如果配置的这些插件有任何一个返回失败,则Pod将被拒绝。

 func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { 	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) 	defer trace.LogIfLong(100 * time.Millisecond)  	if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil { 		return result, err 	} 	trace.Step("Snapshotting scheduler cache and node infos done")  	if sched.nodeInfoSnapshot.NumNodes() == 0 { 		return result, ErrNoNodesAvailable 	}  	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod) 	if err != nil { 		return result, err 	} 	trace.Step("Computing predicates done")  	if len(feasibleNodes) == 0 { 		return result, &framework.FitError{ 			Pod:         pod, 			NumAllNodes: sched.nodeInfoSnapshot.NumNodes(), 			Diagnosis:   diagnosis, 		} 	}  	// When only one node after predicate, just use it. 	if len(feasibleNodes) == 1 { 		return ScheduleResult{ 			SuggestedHost:  feasibleNodes[0].Name, 			EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap), 			FeasibleNodes:  1, 		}, nil 	} 	// 这里会完成prescore,score 	priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes) 	if err != nil { 		return result, err 	}  	host, err := selectHost(priorityList) 	trace.Step("Prioritizing done")  	return ScheduleResult{ 		SuggestedHost:  host, 		EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap), 		FeasibleNodes:  len(feasibleNodes), 	}, err } 

priorityNodes 会通过配置的插件给Node打分,并返回每个Node的分数,将每个插件打分结果计算总和获得Node的分数,最后获得节点的加权总分数。

func prioritizeNodes( 	ctx context.Context, 	extenders []framework.Extender, 	fwk framework.Framework, 	state *framework.CycleState, 	pod *v1.Pod, 	nodes []*v1.Node, ) (framework.NodeScoreList, error) { 	// If no priority configs are provided, then all nodes will have a score of one. 	// This is required to generate the priority list in the required format 	if len(extenders) == 0 && !fwk.HasScorePlugins() { 		result := make(framework.NodeScoreList, 0, len(nodes)) 		for i := range nodes { 			result = append(result, framework.NodeScore{ 				Name:  nodes[i].Name, 				Score: 1, 			}) 		} 		return result, nil 	}  	// Run PreScore plugins. 	preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes) 	if !preScoreStatus.IsSuccess() { 		return nil, preScoreStatus.AsError() 	}  	// Run the Score plugins. 	scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes) 	if !scoreStatus.IsSuccess() { 		return nil, scoreStatus.AsError() 	}  	// Additional details logged at level 10 if enabled. 	klogV := klog.V(10) 	if klogV.Enabled() { 		for plugin, nodeScoreList := range scoresMap { 			for _, nodeScore := range nodeScoreList { 				klogV.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", plugin, "node", nodeScore.Name, "score", nodeScore.Score) 			} 		} 	}  	// Summarize all scores. 	result := make(framework.NodeScoreList, 0, len(nodes))  	for i := range nodes { 		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) 		for j := range scoresMap { 			result[i].Score += scoresMap[j][i].Score 		} 	}  	if len(extenders) != 0 && nodes != nil { 		var mu sync.Mutex 		var wg sync.WaitGroup 		combinedScores := make(map[string]int64, len(nodes)) 		for i := range extenders { 			if !extenders[i].IsInterested(pod) { 				continue 			} 			wg.Add(1) 			go func(extIndex int) { 				metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc() 				defer func() { 					metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec() 					wg.Done() 				}() 				prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes) 				if err != nil { 					// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities 					klog.V(5).InfoS("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name()) 					return 				} 				mu.Lock() 				for i := range *prioritizedList { 					host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score 					if klogV.Enabled() { 						klogV.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", host, "score", score) 					} 					combinedScores[host] += score * weight 				} 				mu.Unlock() 			}(i) 		} 		// wait for all go routines to finish 		wg.Wait() 		for i := range result { 			// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore, 			// therefore we need to scale the score returned by extenders to the score range used by the scheduler. 			result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority) 		} 	}  	if klogV.Enabled() { 		for i := range result { 			klogV.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score) 		} 	} 	return result, nil } 

Reserve

Reserve 因为绑定事件时异步发生的,该插件是为了避免Pod在绑定到节点前时,调度到新的Pod,使节点使用资源超过可用资源情况。如果后续阶段发生错误或失败,将触发 UnReserve 回滚(通知性扩展点)。这也是作为调度周期中最后一个状态,要么成功到 postBind ,要么失败触发 UnReserve

// Run the Reserve method of reserve plugins. if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { // 当处理不成功时     metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))     // 触发 un-reserve 来清理相关Pod的状态     fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)     if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil {         klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed")     }     sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, sts.AsError(), SchedulerError, clearNominatedNode)     return } 

permit

Permit 插件可以阻止或延迟 Pod 的绑定

	// Run "permit" plugins. 	runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) 	if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() { 		var reason string 		if runPermitStatus.IsUnschedulable() { 			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) 			reason = v1.PodReasonUnschedulable 		} else { 			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) 			reason = SchedulerError 		}         // 只要其中一个插件返回的状态不是 success 或者 wait 		fwk.RunReservePluginsUnreserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)         // 从cache中忘掉pod 		if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { 			klog.ErrorS(forgetErr, "Scheduler cache ForgetPod failed") 		} 		sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, runPermitStatus.AsError(), reason, clearNominatedNode) 		return 	}  

Binding Cycle

在选择好 FN 后则做一个假设绑定,并更新到cache中,接下来回去执行真正的bind操作,也就是 binding cycle

func (sched *Scheduler) scheduleOne(ctx context.Context) { 	...     ... 	// binding cycle 是一个异步的操作,这里表现就是go协程 	go func() { 		bindingCycleCtx, cancel := context.WithCancel(ctx) 		defer cancel() 		metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Inc() 		defer metrics.SchedulerGoroutines.WithLabelValues(metrics.Binding).Dec() 		// 运行WaitOnPermit插件,如果失败则,unReserve回滚 		waitOnPermitStatus := fwk.WaitOnPermit(bindingCycleCtx, assumedPod) 		if !waitOnPermitStatus.IsSuccess() { 			var reason string 			if waitOnPermitStatus.IsUnschedulable() { 				metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) 				reason = v1.PodReasonUnschedulable 			} else { 				metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) 				reason = SchedulerError 			} 			// trigger un-reserve plugins to clean up state associated with the reserved Pod 			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) 			if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { 				klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") 			} else { 				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, 				// as the assumed Pod had occupied a certain amount of resources in scheduler cache. 				// TODO(#103853): de-duplicate the logic. 				// Avoid moving the assumed Pod itself as it's always Unschedulable. 				// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would 				// update `q.moveRequest` and thus move the assumed pod to backoffQ anyways. 				defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, func(pod *v1.Pod) bool { 					return assumedPod.UID != pod.UID 				}) 			} 			sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, waitOnPermitStatus.AsError(), reason, clearNominatedNode) 			return 		}  	// 运行Prebind 插件 		preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) 		if !preBindStatus.IsSuccess() { 			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) 			// trigger un-reserve plugins to clean up state associated with the reserved Pod 			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) 			if forgetErr := sched.Cache.ForgetPod(assumedPod); forgetErr != nil { 				klog.ErrorS(forgetErr, "scheduler cache ForgetPod failed") 			} else { 				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, 				// as the assumed Pod had occupied a certain amount of resources in scheduler cache. 				// TODO(#103853): de-duplicate the logic. 				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) 			} 			sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, preBindStatus.AsError(), SchedulerError, clearNominatedNode) 			return 		} 		// bind是真正的绑定操作 		err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state) 		if err != nil { 			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) 			// 如果失败了就触发 un-reserve plugins  			fwk.RunReservePluginsUnreserve(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) 			if err := sched.Cache.ForgetPod(assumedPod); err != nil { 				klog.ErrorS(err, "scheduler cache ForgetPod failed") 			} else { 				// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, 				// as the assumed Pod had occupied a certain amount of resources in scheduler cache. 				// TODO(#103853): de-duplicate the logic. 				sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(internalqueue.AssignedPodDelete, nil) 			} 			sched.handleSchedulingFailure(ctx, fwk, assumedPodInfo, fmt.Errorf("binding rejected: %w", err), SchedulerError, clearNominatedNode) 			return 		} 		// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. 		klog.V(2).InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) 		metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start)) 		metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) 		metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))  		// 运行 "postbind" 插件         // 是通知性的扩展点,该插件在绑定 Pod 后调用,可用于清理相关资源()。 		fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)  		// At the end of a successful binding cycle, move up Pods if needed. 		if len(podsToActivate.Map) != 0 { 			sched.SchedulingQueue.Activate(podsToActivate.Map) 			// Unlike the logic in scheduling cycle, we don't bother deleting the entries 			// as `podsToActivate.Map` is no longer consumed. 		} 	}() } 

调度上下文中的失败流程

上面说到的都是正常的请求,下面会对失败的请求是如何重试的进行分析,而 scheduler 中关于失败处理方面相关的属性会涉及到上面 scheduler 结构中的 backoffQunschedulablePods

  • backoffQ:也是一个 heap 类型的优先级队列,存放的是不可调度的Pod
  • unschedulablePods :保存确定不可被调度的Pod,一个map类型

backoffQ 与 unschedulablePods 会在初始化 scheduler 时初始化,

func NewPriorityQueue( 	lessFn framework.LessFunc, 	informerFactory informers.SharedInformerFactory, 	opts ...Option, ) *PriorityQueue { 	options := defaultPriorityQueueOptions 	for _, opt := range opts { 		opt(&options) 	}  	comp := func(podInfo1, podInfo2 interface{}) bool { 		pInfo1 := podInfo1.(*framework.QueuedPodInfo) 		pInfo2 := podInfo2.(*framework.QueuedPodInfo) 		return lessFn(pInfo1, pInfo2) 	}  	if options.podNominator == nil { 		options.podNominator = NewPodNominator(informerFactory.Core().V1().Pods().Lister()) 	}  	pq := &PriorityQueue{ 		PodNominator:                      options.podNominator, 		clock:                             options.clock, 		stop:                              make(chan struct{}), 		podInitialBackoffDuration:         options.podInitialBackoffDuration, 		podMaxBackoffDuration:             options.podMaxBackoffDuration, 		podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration, 		activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()), 		unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder()), 		moveRequestCycle:                  -1, 		clusterEventMap:                   options.clusterEventMap, 	} 	pq.cond.L = &pq.lock     // 初始化backoffQ     // NewWithRecorder作为一个可选的 metricRecorder 的 Heap 对象。     // podInfoKeyFunc是一个函数,返回错误与字符串     // pq.podsCompareBackoffCompleted 比较两个pod的回退时间,如果第一个在第二个之前为true,     // 反之 false 	pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder()) 	pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()  	return pq } 

对于初始化 backoffQ 会产生的两个函数,getBackoffTimecalculateBackoffDuration

// getBackoffTime returns the time that podInfo completes backoff func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time { 	duration := p.calculateBackoffDuration(podInfo) 	backoffTime := podInfo.Timestamp.Add(duration) 	return backoffTime }  // calculateBackoffDuration is a helper function for calculating the backoffDuration // based on the number of attempts the pod has made. func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration { 	duration := p.podInitialBackoffDuration 	for i := 1; i < podInfo.Attempts; i++ { 		// Use subtraction instead of addition or multiplication to avoid overflow. 		if duration > p.podMaxBackoffDuration-duration { 			return p.podMaxBackoffDuration 		} 		duration += duration 	} 	return duration } 

对于整个故障错误会按照如下流程进行,在初始化 scheduler 会注册一个 Error 函数,这个函数用作对不可调度Pod进行处理,实际上被注册的函数是 MakeDefaultErrorFunc。这个函数将作为 Error 函数被调用。

sched := newScheduler(     schedulerCache,     extenders,     internalqueue.MakeNextPodFunc(podQueue),     MakeDefaultErrorFunc(client, podLister, podQueue, schedulerCache),     stopEverything,     podQueue,     profiles,     client,     snapshot,     options.percentageOfNodesToScore, ) 

而在 调度周期中,也就是 scheduleOne 可以看到,每个扩展点操作失败后都会调用 handleSchedulingFailure 而该函数,使用了注册的 Error 函数来处理Pod

func (sched *Scheduler) scheduleOne(ctx context.Context) { 	... 	defer cancel() 	scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod) 	if err != nil {  		var nominatingInfo *framework.NominatingInfo 		if fitError, ok := err.(*framework.FitError); ok { 			if !fwk.HasPostFilterPlugins() { 				klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed") 			} else { 			 				result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap) 				if status.Code() == framework.Error { 					klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status) 				} else { 					fitError.Diagnosis.PostFilterMsg = status.Message() 					klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status) 				} 				if result != nil { 					nominatingInfo = result.NominatingInfo 				} 			} 	 			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) 		} else if err == ErrNoNodesAvailable { 			nominatingInfo = clearNominatedNode 			// No nodes available is counted as unschedulable rather than an error. 			metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start)) 		} else { 			nominatingInfo = clearNominatedNode 			klog.ErrorS(err, "Error selecting node for pod", "pod", klog.KObj(pod)) 			metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start)) 		}         // 处理不可调度Pod 		sched.handleSchedulingFailure(ctx, fwk, podInfo, err, v1.PodReasonUnschedulable, nominatingInfo) 		return 	}  

来到了注册的 Error 函数 MakeDefaultErrorFunc

func MakeDefaultErrorFunc(client clientset.Interface, podLister corelisters.PodLister, podQueue internalqueue.SchedulingQueue, schedulerCache internalcache.Cache) func(*framework.QueuedPodInfo, error) { 	return func(podInfo *framework.QueuedPodInfo, err error) { 		pod := podInfo.Pod 		if err == ErrNoNodesAvailable { 			klog.V(2).InfoS("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) 		} else if fitError, ok := err.(*framework.FitError); ok { 			// Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently. 			podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins 			klog.V(2).InfoS("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", err) 		} else if apierrors.IsNotFound(err) { 			klog.V(2).InfoS("Unable to schedule pod, possibly due to node not found; waiting", "pod", klog.KObj(pod), "err", err) 			if errStatus, ok := err.(apierrors.APIStatus); ok && errStatus.Status().Details.Kind == "node" { 				nodeName := errStatus.Status().Details.Name 				// when node is not found, We do not remove the node right away. Trying again to get 				// the node and if the node is still not found, then remove it from the scheduler cache. 				_, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) 				if err != nil && apierrors.IsNotFound(err) { 					node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}} 					if err := schedulerCache.RemoveNode(&node); err != nil { 						klog.V(4).InfoS("Node is not found; failed to remove it from the cache", "node", node.Name) 					} 				} 			} 		} else { 			klog.ErrorS(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod)) 		}  		// Check if the Pod exists in informer cache. 		cachedPod, err := podLister.Pods(pod.Namespace).Get(pod.Name) 		if err != nil { 			klog.InfoS("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", err) 			return 		}  		// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler. 		// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version. 		if len(cachedPod.Spec.NodeName) != 0 { 			klog.InfoS("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName) 			return 		}  		// As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here. 		podInfo.PodInfo = framework.NewPodInfo(cachedPod.DeepCopy())         // 添加到unschedulable队列中 		if err := podQueue.AddUnschedulableIfNotPresent(podInfo, podQueue.SchedulingCycle()); err != nil { 			klog.ErrorS(err, "Error occurred") 		} 	} } 

下面来到 AddUnschedulableIfNotPresent ,这个也是操作 backoffQunschedulablePods 的真正的动作

AddUnschedulableIfNotPresent 函数会吧无法调度的 pod 插入队列,除非它已经在队列中。通常情况下,PriorityQueue 将不可调度的 Pod 放在 unschedulablePods 中。但如果最近有 move request,则将 pod 放入 podBackoffQ 中。

func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { 	p.lock.Lock() 	defer p.lock.Unlock() 	pod := pInfo.Pod     // 如果已经存在则不添加 	if p.unschedulablePods.get(pod) != nil { 		return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod)) 	} 	// 检查是否在activeQ中 	if _, exists, _ := p.activeQ.Get(pInfo); exists { 		return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod)) 	}     // 检查是否在podBackoffQ中 	if _, exists, _ := p.podBackoffQ.Get(pInfo); exists { 		return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod)) 	}  	// 在重新添加时,会刷新 Pod时间为最新操作的时间 	pInfo.Timestamp = p.clock.Now()  	for plugin := range pInfo.UnschedulablePlugins { 		metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc() 	}     // 如果接受到move request那么则放入BackoffQ 	if p.moveRequestCycle >= podSchedulingCycle { 		if err := p.podBackoffQ.Add(pInfo); err != nil { 			return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err) 		} 		metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc() 	} else {         // 否则将放入到 unschedulablePods 		p.unschedulablePods.addOrUpdate(pInfo) 		metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()  	}  	p.PodNominator.AddNominatedPod(pInfo.PodInfo, nil) 	return nil } 

在启动 scheduler 时,会将这两个队列异步启用两个loop来操作队列。表现在 Run()

func (p *PriorityQueue) Run() { 	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) 	go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop) } 

可以看到 flushBackoffQCompleted 作为 BackoffQ 实现;而 flushUnschedulablePodsLeftover 作为 UnschedulablePods 实现。

flushBackoffQCompleted 是用于将所有已完成回退的 pod 从 backoffQ 移到 activeQ

func (p *PriorityQueue) flushBackoffQCompleted() { 	p.lock.Lock() 	defer p.lock.Unlock() 	broadcast := false 	for { // 这就是heap实现的方法,窥视下,但不弹出 		rawPodInfo := p.podBackoffQ.Peek() 		if rawPodInfo == nil { 			break 		} 		pod := rawPodInfo.(*framework.QueuedPodInfo).Pod 		boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo)) 		if boTime.After(p.clock.Now()) { 			break 		} 		_, err := p.podBackoffQ.Pop() // 弹出一个 		if err != nil { 			klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod)) 			break 		} 		p.activeQ.Add(rawPodInfo) // 放入到活动队列中 		metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() 		broadcast = true 	}  	if broadcast { 		p.cond.Broadcast() 	} } 

flushUnschedulablePodsLeftover 函数用于将在 unschedulablePods 中的存放时间超过 podMaxInUnschedulablePodsDuration 值的 pod 移动到 backoffQactiveQ 中。

podMaxInUnschedulablePodsDuration 会根据配置传入,当没有传入,也就是使用了 Deprecated 那么会为5分钟。

func NewOptions() *Options { 	o := &Options{ 		SecureServing:  apiserveroptions.NewSecureServingOptions().WithLoopback(), 		Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(), 		Authorization:  apiserveroptions.NewDelegatingAuthorizationOptions(), 		Deprecated: &DeprecatedOptions{ 			PodMaxInUnschedulablePodsDuration: 5 * time.Minute, 		}, 

对于 flushUnschedulablePodsLeftover 就是做一个时间对比,然后添加到对应的队列中

func (p *PriorityQueue) flushUnschedulablePodsLeftover() { 	p.lock.Lock() 	defer p.lock.Unlock()  	var podsToMove []*framework.QueuedPodInfo 	currentTime := p.clock.Now() 	for _, pInfo := range p.unschedulablePods.podInfoMap { 		lastScheduleTime := pInfo.Timestamp 		if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration { 			podsToMove = append(podsToMove, pInfo) 		} 	}  	if len(podsToMove) > 0 { 		p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout) 	} } 

总结调度上下文流程

  • 在构建一个 scheduler 时经历如下步骤:
    • 准备cache,informer,queue,错误处理函数等
    • 添加事件函数,会监听资源(如Pod),当有变动则触发对应事件函数,这是入站 activeQ
  • 构建完成后会 run,run时会run一个 SchedulingQueue,这个是作为不可调度队列
    • BackoffQ
    • UnschedulablePods
    • 不可调度队列会根据注册时定期消费队列中Pod将其添加到 activeQ
  • 启动一个 scheduleOne 的loop,这个是调度上下文中所有的扩展点的执行,也是 activeQ 的消费端
    • scheduleOne 获取 pod
    • 执行各个扩展点,如果出错则 Error 函数 MakeDefaultErrorFunc 将其添加到不可调度队列中
    • 回到不可调度队列中消费部分

Reference

[1] kubernetes scheduler extender
[2] scheduling framework
[3] Extension points

发表评论

相关文章