Go动态感知资源变更的技术实践,你指定用过!

最近在倒腾“AI大模型基础设施”, 宏观目标是做一个基于云原生的AI算力平台,目前因公司隐私暂不能公开宏观背景和技术方案, 姑且记录实践中遇到的一些技能点。

Go动态感知资源变更的技术实践,你指定用过!

前文已经记录了第1步: 使用arena 提交训练任务的实践。

今天我们记录聊一聊平台侧另一个核心能力:
动态纳管云原生k8s集群,并监听AI/ML训练任务的状态变化。也就是上图的第4步。


作为面向算法开发者的云原生saas平台,平台在界面上提供了纳管集群的交互入口,平台启动后会去监听pytorch,mpi训练任务的状态变更,并回显到界面(并给开发者发送飞书变更通知)。

本文核心关注:

  • 如何动态纳管k8s集群
  • 如何重建k8s informer监听

这里我提供我的实践, 请看下图:

Go动态感知资源变更的技术实践,你指定用过!

  1. 程序启动,加载初始k8s集群,informer监听训练任务状态,并绑定informer停止信号(stopCh)、重建信号(rebuildCh)
package main  import "k8s.io/client-go/rest"  type StartInformerFunc func(clusterId string, restConf *rest.Config) (stopCh chan struct{}, err error) type InformerManager struct { 	clusterConfigs map[string]string 	gvr            map[string]StartInformerFunc 	stopCh         chan struct{}  // informer 需要用到 	rebuildCh      chan struct{} }  var InformerManagerInstance *InformerManager  func NewInFormerManager() *InformerManager { 	InformerManagerInstance = &InformerManager{ 		clusterConfigs: map[string]string{ 			"id1": "kubeconfig1", 			"id2": "kubeconfig2", 		}, 		gvr: map[string]StartInformerFunc{ 		   "pytorchjob": startPytorchjobInformer, 		   "mpijob": startMpijobInformer, 		   "job": startRawjobInformer, 		}, 		stopCh:    make(chan struct{}), 		rebuildCh: make(chan struct{}, 1), 	} 	return InformerManagerInstance }  
  1. 开协程定时任务去轮循落盘的待纳管k8s集群记录
  2. 考虑纳管的k8s集群数据可控,变更时机可控,采用md5校验的方式判断是否发生集群变更

下面的k8s.CheckClusterChanged(mgr.clusterConfigs) 是利用对kube-configs做md5, 前后对比判断集群是否发生变更。

func (mgr *InformerManager) monitorcLusterChanged() bool { 	ticker := time.NewTicker(30 * time.Second) 	defer ticker.Stop() 	for { 		select { 		case <-ticker.C: 			if k8s.CheckClusterChanged(mgr.clusterConfigs) { 				fmt.Println("cluster changed") 				mgr.rebuildCh <- struct{}{} 			} else { 				fmt.Println("cluster no change") 			} 		case <-mgr.rebuildCh: 			fmt.Println("rebuild informer") 			mgr.clusterConfigs = k8s.GetInitclusters() 			for k, v := range mgr.clusterConfigs { 				rc, err := k8s.ConvertToRestConfig([]byte(v)) 				if err != nil { 					fmt.Println("convert to rest config failed") 					continue 				} 				for gvr, StartInformerFunc := range mgr.gvr { 					go func(k string, rc *rest.Config) { 						newStopch, err := StartInformerFunc(k, rc) 						if err != nil { 							return 						} 						if mgr.stopCh != nil { 							close(mgr.stopCh) 						} 						mgr.stopCh = newStopch 						fmt.Printf("%s informer started for cluster %s  n", gvr, k) 					}(k, rc) 				} 			} 		} 	} } 
  1. 利用简单的链表指针,重置informer监听信道
func (mgr *InformerManager) Run() { 	for k, v := range mgr.clusterConfigs { 		rc, err := k8s.ConvertToRestConfig([]byte(v)) 		if err != nil { 			fmt.Println("failed to convert kubeconfig to rest config") 			continue 		} 		for gvr, StartInformerFunc := range mgr.gvr { 			go func(k string, rc *rest.Config) { 				newStopch, err := StartInformerFunc(k, rc) 				if err != nil { 					return 				} 				if mgr.stopCh != nil { 					close(mgr.stopCh) 				} 				mgr.stopCh = newStopch 				fmt.Printf("start %s informer for cluster %s  n", gvr,k) 			}(k, rc) 		} 		go mgr.monitorClusterChanged() 	} } 

本文记录了使用定时任务感知资源变更,并利用golang信道作为变更信号的姿势,可作为golang中动态感知资源变化的常规技能实践。

发表评论

评论已关闭。

相关文章