Overview
Kubernetes 默认调度器在调度 Pod 时并不关心特殊资源例如磁盘、GPU 等,因此突发奇想来改造调度器,在翻阅官方调度器框架[1]、调度器配置[2]和参考大佬的文章[3]后,自己也来尝试改写一下。
环境配置
相关软件版本:
- Kubernetes 版本:v1.19.0
- Docker 版本:v26.1.2
- Prometheus 版本:v2.49
- Node Exporter 版本:v1.7.0
集群内有 1 个 master 和 3 个 node。
实验部分
项目总览
项目结构如下:
. ├── Dockerfile ├── deployment.yaml ├── go.mod ├── go.sum ├── main.go ├── pkg │ ├── cpu │ │ └── cputraffic.go │ ├── disk │ │ └── disktraffic.go │ ├── diskspace │ │ └── diskspacetraffic.go │ ├── memory │ │ └── memorytraffic.go │ ├── network │ │ └── networktraffic.go │ └── prometheus.go ├── scheduler ├── scheduler.conf └── scheduler.yaml
插件部分
下面以构建内存插件为例。
定义插件名称、变量和结构体
const MemoryPlugin = "MemoryTraffic" var _ = framework.ScorePlugin(&MemoryTraffic{}) type MemoryTraffic struct { prometheus *pkg.PrometheusHandle handle framework.FrameworkHandle }
下面来实现 framework.FrameworkHandle 的接口。
先定义插件初始化入口
func New(plArgs runtime.Object, h framework.FrameworkHandle) (framework.Plugin, error) { args := &MemoryTrafficArgs{} if err := fruntime.DecodeInto(plArgs, args); err != nil { return nil, err } klog.Infof("[MemoryTraffic] args received. Device: %s; TimeRange: %d, Address: %s", args.DeviceName, args.TimeRange, args.IP) return &MemoryTraffic{ handle: h, prometheus: pkg.NewProme(args.IP, args.DeviceName, time.Minute*time.Duration(args.TimeRange)), }, nil }
实现 Score 接口,Score 进行初步打分
func (n *MemoryTraffic) Score(ctx context.Context, state *framework.CycleState, p *corev1.Pod, nodeName string) (int64, *framework.Status) { nodeBandwidth, err := n.prometheus.MemoryGetGauge(nodeName) if err != nil { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("error getting node bandwidth measure: %s", err)) } bandWidth := int64(nodeBandwidth.Value) klog.Infof("[MemoryTraffic] node '%s' bandwidth: %v", nodeName, bandWidth) return bandWidth, nil }
实现 NormalizeScore,对上一步 Score 的打分进行修正
func (n *MemoryTraffic) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, scores framework.NodeScoreList) *framework.Status { var higherScore int64 for _, node := range scores { if higherScore < node.Score { higherScore = node.Score } } // 计算公式为,满分 - (当前内存使用 / 总内存 * 100) // 公式的计算结果为,内存使用率越大的节点,分数越低 for i, node := range scores { scores[i].Score = node.Score * 100 / higherScore klog.Infof("[MemoryTraffic] Nodes final score: %v", scores[i].Score) } klog.Infof("[MemoryTraffic] Nodes final score: %v", scores) return nil }
配置插件名称和返回 ScoreExtension
func (n *MemoryTraffic) Name() string { return MemoryPlugin } // 如果返回framework.ScoreExtensions 就需要实现framework.ScoreExtensions func (n *MemoryTraffic) ScoreExtensions() framework.ScoreExtensions { return n }
Prometheus 部分
首先来编写查询内存可用率的 PromQL
const memoryMeasureQueryTemplate = ` (avg_over_time(node_memory_MemAvailable_bytes[30m]) / avg_over_time(node_memory_MemTotal_bytes[30m])) * 100 * on(instance) group_left(nodename) (node_uname_info{nodename="%s"})`
然后来声明 PrometheusHandle
type PrometheusHandle struct { deviceName string timeRange time.Duration ip string client v1.API }
另外在插件部分也要声明查询 Prometheus 的参数结构体
type MemoryTrafficArgs struct { IP string `json:"ip"` DeviceName string `json:"deviceName"` TimeRange int `json:"timeRange"` }
编写初始化 Prometheus 插件入口
func NewProme(ip, deviceName string, timeRace time.Duration) *PrometheusHandle { client, err := api.NewClient(api.Config{Address: ip}) if err != nil { klog.Fatalf("[Prometheus Plugin] FatalError creating prometheus client: %s", err.Error()) } return &PrometheusHandle{ deviceName: deviceName, ip: ip, timeRange: timeRace, client: v1.NewAPI(client), } }
编写通用查询接口,可供其他类型资源查询
func (p *PrometheusHandle) query(promQL string) (model.Value, error) { results, warnings, err := p.client.Query(context.Background(), promQL, time.Now()) if len(warnings) > 0 { klog.Warningf("[Prometheus Query Plugin] Warnings: %vn", warnings) } return results, err }
获取内存可用率接口
func (p *PrometheusHandle) MemoryGetGauge(node string) (*model.Sample, error) { value, err := p.query(fmt.Sprintf(memoryMeasureQueryTemplate, node)) fmt.Println(fmt.Sprintf(memoryMeasureQueryTemplate, node)) if err != nil { return nil, fmt.Errorf("[MemoryTraffic Plugin] Error querying prometheus: %w", err) } nodeMeasure := value.(model.Vector) if len(nodeMeasure) != 1 { return nil, fmt.Errorf("[MemoryTraffic Plugin] Invalid response, expected 1 value, got %d", len(nodeMeasure)) } return nodeMeasure[0], nil }
然后在程序入口里启用插件并执行
func main() { rand.Seed(time.Now().UnixNano()) command := app.NewSchedulerCommand( app.WithPlugin(network.NetworkPlugin, network.New), app.WithPlugin(disk.DiskPlugin, disk.New), app.WithPlugin(diskspace.DiskSpacePlugin, diskspace.New), app.WithPlugin(cpu.CPUPlugin, cpu.New), app.WithPlugin(memory.MemoryPlugin, memory.New), ) // 对于外部注册一个plugin // command := app.NewSchedulerCommand( // app.WithPlugin("example-plugin1", ExamplePlugin1.New)) if err := command.Execute(); err != nil { fmt.Fprintf(os.Stderr, "%vn", err) os.Exit(1) } }
配置部分
为方便观察,这里使用二进制方式运行,准备运行时的配置文件
apiVersion: kubescheduler.config.k8s.io/v1beta1 kind: KubeSchedulerConfiguration clientConnection: kubeconfig: /etc/kubernetes/scheduler.conf profiles: - schedulerName: custom-scheduler plugins: score: enabled: - name: "CPUTraffic" weight: 3 - name: "MemoryTraffic" weight: 4 - name: "DiskSpaceTraffic" weight: 3 - name: "NetworkTraffic" weight: 2 disabled: - name: "*" pluginConfig: - name: "NetworkTraffic" args: ip: "http://172.19.32.140:9090" deviceName: "eth0" timeRange: 60 - name: "CPUTraffic" args: ip: "http://172.19.32.140:9090" deviceName: "eth0" timeRange: 0 - name: "MemoryTraffic" args: ip: "http://172.19.32.140:9090" deviceName: "eth0" timeRange: 0 - name: "DiskSpaceTraffic" args: ip: "http://172.19.32.140:9090" deviceName: "eth0" timeRange: 0
kubeconfig 处为 master 节点的 scheduler.conf,以实际路径为准,内包含集群的证书哈希,ip 为部署 Prometheus 节点的 ip,端口为 Promenade 配置中对外暴露的端口。
将二进制文件和 scheduler.yaml 放至 master 同一目录下运行:
./scheduler --logtostderr=true --address=127.0.0.1 --v=6 --config=`pwd`/scheduler.yaml --kubeconfig="/etc/kubernetes/scheduler.conf"
验证结果
准备一个要部署的 Pod,使用指定的调度器名称
apiVersion: apps/v1 kind: Deployment metadata: name: gin namespace: default labels: app: gin spec: replicas: 2 selector: matchLabels: app: gin template: metadata: labels: app: gin spec: schedulerName: my-custom-scheduler # 使用自定义调度器 containers: - name: gin image: jaydenchang/k8s_test:latest imagePullPolicy: Always command: ["./app"] ports: - containerPort: 9999 protocol: TCP
最后的可以查看日志,部分日志如下:
I0808 17:32:35.138289 27131 memorytraffic.go:83] [MemoryTraffic] node 'node1' bandwidth: %!s(int64=2680340) I0808 17:32:35.138763 27131 memorytraffic.go:70] [MemoryTraffic] Nodes final score: [{node1 2680340} {node2 0}] I0808 17:32:35.138851 27131 memorytraffic.go:70] [MemoryTraffic] Nodes final score: [{node1 71} {node2 0}] I0808 17:32:35.138911 27131 memorytraffic.go:73] [MemoryTraffic] Nodes final score: [{node1 71} {node2 0}] I0808 17:32:35.139565 27131 default_binder.go:51] Attempting to bind default/go-deployment-66878c4885-b4b7k to node1 I0808 17:32:35.141114 27131 eventhandlers.go:225] add event for scheduled pod default/go-deployment-66878c4885-b4b7k I0808 17:32:35.141714 27131 eventhandlers.go:205] delete event for unscheduled pod default/go-deployment-66878c4885-b4b7k I0808 17:32:35.143504 27131 scheduler.go:609] "Successfully bound pod to node" pod="default/go-deployment-66878c4885-b4b7k" node="no de1" evaluatedNodes=2 feasibleNodes=2 I0808 17:32:35.104540 27131 scheduler.go:609] "Successfully bound pod to node" pod="default/go-deployment-66878c4885-b4b7k" node="no de1" evaluatedNodes=2 feasibleNodes=2