【源码阅读】fasthttp 的 server.Shutdown() 究竟能不能实现 graceful shutdown

作者:张富春(ahfuzhang),转载时请注明作者和引用链接,谢谢!


因为一个基于 fasthttp 的服务在发布上出现了一定量的 503 错误,由此怀疑 fasthttp 可能没有很好地实现 graceful shutdown.
本文将通过源码阅读的方式,推导 fasthttp 实现 graceful shutdown 的细节。

1. 业务代码中的 graceful shutdown 实现方法

func main(){      // ...     // 容器退出前会先发送 SIGTERM 信号 	sigs := make(chan os.Signal, 1) 	signal.Notify(sigs, syscall.SIGHUP, 		syscall.SIGINT, 		syscall.SIGTERM, 		syscall.SIGQUIT)     // 	server := &fasthttp.Server{ 		Handler: httphandle.FastHTTPHandler, 	} 	go func() { 		log.Info("Server start on", cfg.HTTP.Addr) 		if err := server.ListenAndServe(cfg.HTTP.Addr); err != nil { 			log.Fatal(err.Error()) 		} 		log.Info("Http Server Ended") 	}() 	// server.CloseOnShutdown = true 	<-sigs  // 一旦收到信号,就开始进入退出流程 	log.Info("Ready to shutdown...") 	// 避免缓存中的日志没有刷到磁盘 	log.Close() 	_ = server.Shutdown()  // 这一行是实现 graceful shutdown 的关键 } 

2. Shutdown() 函数的实现细节

func (s *Server) ShutdownWithContext(ctx context.Context) (err error) { 	s.mu.Lock() 	defer s.mu.Unlock() 	// stop 标志置 1     // stop 标志主要影响 tcp 长连接中,是否在一个连接上处理多次请求和响应后 —— 如果遇到服务关停的消息,如何决定长连接上的后续行为 	atomic.StoreInt32(&s.stop, 1) 	defer atomic.StoreInt32(&s.stop, 0)  	if s.ln == nil { 		return nil 	} 	// 所有监听的端口进行 close     // 关闭监听的端口后,意味着新的 tcp 连接将不再 accept 	for _, ln := range s.ln { 		if err = ln.Close(); err != nil { 			return err 		} 	}     // done 是一个用于信号通知的 channel     // done 用于影响 RequestCtx 端的服务关停消息     // 例如:业务代码如果做很长时间的处理,可以通过 reqCtx.Done() 来判断服务是否开始关停 	if s.done != nil { 		close(s.done)  // 调用后,业务代码中就可以通过 reqCtx.Done() 知道服务开始关停了 	}  	// Closing the listener will make Serve() call Stop on the worker pool. 	// Setting .stop to 1 will make serveConn() break out of its loop. 	// Now we just have to wait until all workers are done or timeout. 	ticker := time.NewTicker(time.Millisecond * 100) 	defer ticker.Stop() END: 	for { 		// 主动关闭空闲连接        // 只有当前没有请求和响应的空闲长连接,才会被归类到 IdleConns        // 放入 idle 中的 tcp 连接只会等着被关闭,再也不会被拿出来统计 		s.closeIdleConns()          // open 是一个计数器,代表了活跃的 tcp 连接数 		if open := atomic.LoadInt32(&s.open); open == 0 {            // 如果所有的活跃的 tcp 连接都没有了,那么就退出这个等待循环。            // 如果没有任何活跃的 tcp 连接,也就意味着没有任何正在处理中的 request            // 这个是 graceful shutdown 的关键判断条件            // 只有当所有请求都处理完了,才会退出服务器。满足这个条件时,必然不会存在丢失用户请求的情况。 			break 		} 		// This is not an optimal solution but using a sync.WaitGroup 		// here causes data races as it's hard to prevent Add() to be called 		// while Wait() is waiting. 		select { 		case <-ctx.Done(): // 外部是否触发了信号,导致 context 变成 done (通常都是 context.Background(),不会触发这个分支) 			err = ctx.Err() 			break END 		case <-ticker.C: // 每 100 毫秒检查一次 			continue 		} 	}  	s.done = nil 	s.ln = nil 	return err } 

3. fasthttp 对活跃连接的处理细节

tcp 连接数的累加过程

func (s *Server) Serve(ln net.Listener) error {     //... 	// Count our waiting to accept a connection as an open connection. 	// This way we can't get into any weird state where just after accepting 	// a connection Shutdown is called which reads open as 0 because it isn't 	// incremented yet. 	atomic.AddInt32(&s.open, 1) 	defer atomic.AddInt32(&s.open, -1) // 端口不再监听,才会退出下面的循环  	for { 		// 处理新连接进入 		if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { 			// 当 监听器 close 后,这里必然退出 			wp.Stop() 			if err == io.EOF { 				return nil 			} 			return err 		} 		s.setState(c, StateNew) 		atomic.AddInt32(&s.open, 1) // 每当来了一个新的 tcp 连接,活跃连接数 + 1        // ...   }   // ... } 

tcp 连接数的减少过程

 func (s *Server) serveConnCleanup() { 	atomic.AddInt32(&s.open, -1) 	atomic.AddUint32(&s.concurrency, ^uint32(0)) }  // 这个时候已经建立链接了 // 这个函数负责从连接上收/发数据 func (s *Server) serveConn(c net.Conn) (err error) { 	defer s.serveConnCleanup()  // 一旦退出这个函数,活跃连接数 - 1 	atomic.AddUint32(&s.concurrency, 1)     // ... } 

4. 阶段性总结

从以上的源码可以得到以下结论:

  • 当发起服务的关停信号后,首先就是关闭监听端口。也就是说:关停期间,新的 tcp 将无法再连接进来。上游的代理服务器如果配置好,会因为 tcp 无法建立连接而转发给别的可用的后端。
  • Shutdown() 在一个循环中不断检查活跃的 tcp 连接数,直到连接数为 0 才退出循环
    • 这样的逻辑必然导致:所有正在处理的请求一定全部处理完成后才会退出服务。

但是也存在另一种可能:

上游的代理服务器与当前的服务建立了长连接,然后不断在这个长连接上发送请求,导致服务永远不会 shutdown,直到 k8s 环境中触发 sigkill 强制杀掉容器。

可以推断出:如果一个 fasthttp 的服务在容器关闭时是因为 sigkill 而关闭的,那么它肯定没有做到 graceful shutdown.

5. fasthttp 中对于长连接的处理细节

func (s *Server) serveConn(c net.Conn) (err error) {     for {          // 1. 收数据          // 2. 解析 http 头          // 3. 调用用户配置的 callback handler          // 4. 检查是否有 connection close 标志 		connectionClose = connectionClose || 			(s.MaxRequestsPerConn > 0 && connRequestNum >= uint64(s.MaxRequestsPerConn)) ||  // 长连接上允许的最多的往返请求数 			ctx.Response.Header.ConnectionClose() || // http 协议里要求使用短连接 			(s.CloseOnShutdown && atomic.LoadInt32(&s.stop) == 1)  // 服务处于关停状态,且配置了 `一旦进入关停就不要再处理新连接了` 的标志          if connectionClose{               // 如果判断出这是一个需要关闭的连接,那么退出循环后就会把连接放到 Idel 中               break          }          // ...          // 5. 检查服务器关停的标志 		 if atomic.LoadInt32(&s.stop) == 1 { 			err = nil 			break  // 如果调用 Shutdown(), 则循环就不会再处理新的请求了 		 }     } } 

这里会存在另一种极端情况:上游在长连接中再次发来请求,因为循环中未调用 Read() 来接收数据,因此请求留在了内核的 socket buffer 里。
这种情况只有依赖上游的负载均衡来解决。

总结

从流程来看,fasthttp 的 Shutdown() 能够保障所有正在处理的请求都处理完后再关闭服务。
实际部署中可以通过检查 k8s 是否有 sigkill 来判断是否做到了 graceful shutdown.

发表评论

您必须 [ 登录 ] 才能发表留言!

相关文章