最近在倒腾“AI大模型基础设施”, 宏观目标是做一个基于云原生的AI算力平台,目前因公司隐私暂不能公开宏观背景和技术方案, 姑且记录实践中遇到的一些技能点。

前文已经记录了第1步: 使用arena 提交训练任务的实践。
今天我们记录聊一聊平台侧另一个核心能力:
动态纳管云原生k8s集群,并监听AI/ML训练任务的状态变化。也就是上图的第4步。
作为面向算法开发者的云原生saas平台,平台在界面上提供了纳管集群的交互入口,平台启动后会去监听pytorch,mpi训练任务的状态变更,并回显到界面(并给开发者发送飞书变更通知)。
本文核心关注:
- 如何动态纳管k8s集群
- 如何重建k8s informer监听
这里我提供我的实践, 请看下图:

- 程序启动,加载初始k8s集群,informer监听训练任务状态,并绑定informer停止信号(stopCh)、重建信号(rebuildCh)
package main import "k8s.io/client-go/rest" type StartInformerFunc func(clusterId string, restConf *rest.Config) (stopCh chan struct{}, err error) type InformerManager struct { clusterConfigs map[string]string gvr map[string]StartInformerFunc stopCh chan struct{} // informer 需要用到 rebuildCh chan struct{} } var InformerManagerInstance *InformerManager func NewInFormerManager() *InformerManager { InformerManagerInstance = &InformerManager{ clusterConfigs: map[string]string{ "id1": "kubeconfig1", "id2": "kubeconfig2", }, gvr: map[string]StartInformerFunc{ "pytorchjob": startPytorchjobInformer, "mpijob": startMpijobInformer, "job": startRawjobInformer, }, stopCh: make(chan struct{}), rebuildCh: make(chan struct{}, 1), } return InformerManagerInstance }
- 开协程定时任务去轮循落盘的待纳管k8s集群记录
- 考虑纳管的k8s集群数据可控,变更时机可控,采用md5校验的方式判断是否发生集群变更
下面的k8s.CheckClusterChanged(mgr.clusterConfigs) 是利用对kube-configs做md5, 前后对比判断集群是否发生变更。
func (mgr *InformerManager) monitorcLusterChanged() bool { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: if k8s.CheckClusterChanged(mgr.clusterConfigs) { fmt.Println("cluster changed") mgr.rebuildCh <- struct{}{} } else { fmt.Println("cluster no change") } case <-mgr.rebuildCh: fmt.Println("rebuild informer") mgr.clusterConfigs = k8s.GetInitclusters() for k, v := range mgr.clusterConfigs { rc, err := k8s.ConvertToRestConfig([]byte(v)) if err != nil { fmt.Println("convert to rest config failed") continue } for gvr, StartInformerFunc := range mgr.gvr { go func(k string, rc *rest.Config) { newStopch, err := StartInformerFunc(k, rc) if err != nil { return } if mgr.stopCh != nil { close(mgr.stopCh) } mgr.stopCh = newStopch fmt.Printf("%s informer started for cluster %s n", gvr, k) }(k, rc) } } } } }
- 利用简单的链表指针,重置informer监听信道
func (mgr *InformerManager) Run() { for k, v := range mgr.clusterConfigs { rc, err := k8s.ConvertToRestConfig([]byte(v)) if err != nil { fmt.Println("failed to convert kubeconfig to rest config") continue } for gvr, StartInformerFunc := range mgr.gvr { go func(k string, rc *rest.Config) { newStopch, err := StartInformerFunc(k, rc) if err != nil { return } if mgr.stopCh != nil { close(mgr.stopCh) } mgr.stopCh = newStopch fmt.Printf("start %s informer for cluster %s n", gvr,k) }(k, rc) } go mgr.monitorClusterChanged() } }
本文记录了使用定时任务感知资源变更,并利用golang信道作为变更信号的姿势,可作为golang中动态感知资源变化的常规技能实践。