使用client-go实现自定义控制器


使用client-go实现自定义控制器

介绍

我们已经知道,Service对集群之外暴露服务的主要方式有两种:NodePort和LoadBalancer,但是这两种方式,都有一定的缺点:

  • NodePort方式的缺点是会占用很多集群机器的端口,那么当集群服务变多的时候,这个缺点就愈发明显。
  • LoadBalancer的缺点是每个Service都需要一个LB,浪费,麻烦,并且需要Kubernetes之外的设备的支持。

基于这种现状,Kubernetes提供了Ingress资源对象,Ingress只需要一个NodePort或者一个LB就可以满足暴露多个Service的需求。

客户端首先对 域名 执行 DNS 解析,得到 Ingress Controller 所在节点的 IP,然后客户端向 Ingress Controller 发送 HTTP 请求,然后根据 Ingress 对象里面的描述匹配域名,找到对应的 Service 对象,并获取关联的 Endpoints 列表,将客户端的请求转发给其中一个 Pod。

使用client-go实现自定义控制器

本文我们来使用client-go实现一个自定义控制器,通过判断serviceAnnotations属性是否包含ingress/http字段,如果包含则创建ingress,如果不包含则不创建。而且如果存在ingress则进行删除。

具体实现

首先我们创建项目。

$ mkdir ingress-manager && cd ingress-manager $ go mod init ingress-manager  # 由于控制器部分的内容比较多,将它们单独放到pkg目录下 $ mkdir pkg  # 最终项目目录结构如下 . ├── go.mod ├── go.sum ├── main.go └── pkg     └── controller.go 

接着我们来实现controller部分:

package pkg  import ( 	"context" 	apiCoreV1 "k8s.io/api/core/v1" 	netV1 "k8s.io/api/networking/v1" 	"k8s.io/apimachinery/pkg/api/errors" 	metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" 	"k8s.io/apimachinery/pkg/util/runtime" 	"k8s.io/apimachinery/pkg/util/wait" 	informersCoreV1 "k8s.io/client-go/informers/core/v1" 	informersNetV1 "k8s.io/client-go/informers/networking/v1" 	"k8s.io/client-go/kubernetes" 	coreV1 "k8s.io/client-go/listers/core/v1" 	v1 "k8s.io/client-go/listers/networking/v1" 	"k8s.io/client-go/tools/cache" 	"k8s.io/client-go/util/workqueue" 	"reflect" 	"time" )  const ( 	workNum  = 5  // 工作的节点数 	maxRetry = 10 // 最大重试次数  )  // 定义控制器 type Controller struct { 	client        kubernetes.Interface 	ingressLister v1.IngressLister 	serviceLister coreV1.ServiceLister 	queue         workqueue.RateLimitingInterface }  // 初始化控制器 func NewController(client kubernetes.Interface, serviceInformer informersCoreV1.ServiceInformer, ingressInformer informersNetV1.IngressInformer) Controller { 	c := Controller{ 		client:        client, 		ingressLister: ingressInformer.Lister(), 		serviceLister: serviceInformer.Lister(), 		queue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"), 	}  	// 添加事件处理函数 	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 		AddFunc:    c.addService, 		UpdateFunc: c.updateService, 	})  	ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ 		DeleteFunc: c.deleteIngress, 	}) 	return c }  // 入队 func (c *Controller) enqueue(obj interface{}) { 	key, err := cache.MetaNamespaceKeyFunc(obj) 	if err != nil { 		runtime.HandleError(err) 	} 	c.queue.Add(key) }  func (c *Controller) addService(obj interface{}) { 	c.enqueue(obj) }  func (c *Controller) updateService(oldObj, newObj interface{}) { 	// todo 比较annotation 	// 这里只是比较了对象是否相同,如果相同,直接返回 	if reflect.DeepEqual(oldObj, newObj) { 		return 	} 	c.enqueue(newObj) }  func (c *Controller) deleteIngress(obj interface{}) { 	ingress := obj.(*netV1.Ingress) 	ownerReference := metaV1.GetControllerOf(ingress) 	if ownerReference == nil { 		return 	}  	// 判断是否为真的service 	if ownerReference.Kind != "Service" { 		return 	}  	c.queue.Add(ingress.Namespace + "/" + ingress.Name) }  // 启动控制器,可以看到开了五个协程,真正干活的是worker func (c *Controller) Run(stopCh chan struct{}) { 	for i := 0; i < workNum; i++ { 		go wait.Until(c.worker, time.Minute, stopCh) 	} 	<-stopCh }  func (c *Controller) worker() { 	for c.processNextItem() { 	} }  // 业务真正处理的地方 func (c *Controller) processNextItem() bool { 	// 获取key 	item, shutdown := c.queue.Get() 	if shutdown { 		return false 	} 	defer c.queue.Done(item)    // 调用业务逻辑 	err := c.syncService(item.(string)) 	if err != nil {     // 对错误进行处理 		c.handlerError(item.(string), err) 		return false 	} 	return true }   func (c *Controller) syncService(item string) error { 	namespace, name, err := cache.SplitMetaNamespaceKey(item) 	if err != nil { 		return err 	} 	// 获取service 	service, err := c.serviceLister.Services(namespace).Get(name) 	if err != nil { 		if errors.IsNotFound(err) { 			return nil 		} 		return err 	}  	// 新增和删除 	_, ok := service.GetAnnotations()["ingress/http"] 	ingress, err := c.ingressLister.Ingresses(namespace).Get(name) 	if err != nil && !errors.IsNotFound(err) { 		return err 	}   	if ok && errors.IsNotFound(err) { 		// 创建ingress 		ig := c.constructIngress(service) 		_, err := c.client.NetworkingV1().Ingresses(namespace).Create(context.TODO(), ig, metaV1.CreateOptions{}) 		if err != nil { 			return err 		} 	} else if !ok && ingress != nil { 		// 删除ingress 		err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, metaV1.DeleteOptions{}) 		if err != nil { 			return err 		} 	} 	return nil }  func (c *Controller) handlerError(key string, err error) { 	// 如果出现错误,重新加入队列,最大处理10次 	if c.queue.NumRequeues(key) <= maxRetry { 		c.queue.AddRateLimited(key) 		return 	} 	runtime.HandleError(err) 	c.queue.Forget(key) }  func (c *Controller) constructIngress(service *apiCoreV1.Service) *netV1.Ingress { 	// 构造ingress 	pathType := netV1.PathTypePrefix 	ingress := netV1.Ingress{} 	ingress.ObjectMeta.OwnerReferences = []metaV1.OwnerReference{ 		*metaV1.NewControllerRef(service, apiCoreV1.SchemeGroupVersion.WithKind("Service")), 	} 	ingress.Namespace = service.Namespace 	ingress.Name = service.Name 	ingress.Spec = netV1.IngressSpec{ 		Rules: []netV1.IngressRule{ 			{ 				Host: "example.com", 				IngressRuleValue: netV1.IngressRuleValue{ 					HTTP: &netV1.HTTPIngressRuleValue{ 						Paths: []netV1.HTTPIngressPath{ 							{ 								Path:     "/", 								PathType: &pathType, 								Backend: netV1.IngressBackend{ 									Service: &netV1.IngressServiceBackend{ 										Name: service.Name, 										Port: netV1.ServiceBackendPort{ 											Number: 80, 										}, 									}, 								}, 							}, 						}, 					}, 				}, 			}, 		}, 	}  	return &ingress } 

接下来我们来实现main:

package main  import ( 	"ingress-manager/pkg" 	"k8s.io/client-go/informers" 	"k8s.io/client-go/kubernetes" 	"k8s.io/client-go/rest" 	"k8s.io/client-go/tools/clientcmd" )  func main() { 	// 获取config 	// 先尝试从集群外部获取,获取不到则从集群内部获取 	var config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) 	if err != nil { 		clusterConfig, err := rest.InClusterConfig() 		if err != nil { 			panic(err) 		} 		config = clusterConfig 	}  	// 通过config创建 clientSet 	clientSet, err := kubernetes.NewForConfig(config) 	if err != nil { 		panic(err) 	}  	// 通过 client 创建 informer,添加事件处理函数 	factory := informers.NewSharedInformerFactory(clientSet, 0) 	serviceInformer := factory.Core().V1().Services() 	ingressInformer := factory.Networking().V1().Ingresses() 	newController := pkg.NewController(clientSet, serviceInformer, ingressInformer)  	// 启动 informer 	stopCh := make(chan struct{}) 	factory.Start(stopCh) 	factory.WaitForCacheSync(stopCh) 	newController.Run(stopCh) } 

测试

首先创建deploy和service:

apiVersion: apps/v1 kind: Deployment metadata:   name: my-nginx spec:   selector:     matchLabels:       app: my-nginx   template:     metadata:       labels:         app: my-nginx     spec:       containers:         - name: my-nginx           image: nginx:1.17.1           ports:             - containerPort: 80 --- apiVersion: v1 kind: Service metadata:   name: my-nginx   labels:     app: my-nginx spec:   ports:     - port: 80       protocol: TCP       name: http   selector:     app: my-nginx 

创建完成后进行查看:

$ kubectl get deploy,service,ingress NAME                              READY   UP-TO-DATE   AVAILABLE   AGE deployment.apps/my-nginx          1/1     1            1           7m  NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGE service/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78d service/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    7m 

上面的命令我分别获取deploy,service,ingress,但是只获取到了deployservice,这符合我们的预期。接着我们给service/m-nginx中的annotations添加ingress/http: nginx

$ kubectl edit service/my-nginx  apiVersion: v1 kind: Service metadata:   annotations:     ingress/http: nginx     kubectl.kubernetes.io/last-applied-configuration: |       {"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"labels":{"app":"my-nginx"},"name":"my-nginx","namespace":"default"},"spec":{"ports":[{"name":"http","port":80,"protocol":"TCP"}],"selector":{"app":"my-nginx"}}}       ......        service/my-nginx edited 

重新进行查看:

$ kubectl get deploy,service,ingress NAME                              READY   UP-TO-DATE   AVAILABLE   AGE deployment.apps/demo-deployment   1/1     1            1           41d deployment.apps/my-nginx          1/1     1            1           11m  NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGE service/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78d service/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    11m  NAME                                 CLASS    HOSTS         ADDRESS   PORTS   AGE ingress.networking.k8s.io/my-nginx   <none>   example.com             80      19s 

接着我们再来测试下,将ingress/http: nginx注释掉,看看ingress是否会自动删除:

$ kubectl get deploy,service,ingress NAME                              READY   UP-TO-DATE   AVAILABLE   AGE deployment.apps/demo-deployment   1/1     1            1           41d deployment.apps/my-nginx          1/1     1            1           19m  NAME                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)   AGE service/kubernetes   ClusterIP   10.96.0.1      <none>        443/TCP   78d service/my-nginx     ClusterIP   10.105.32.46   <none>        80/TCP    19m 

我们发现和我们预期的效果一样。

如果service被删除了,ingress肯定也是不会存在的。这个这里就不多演示了。有兴趣可以自行测试下。

发表评论

相关文章