作者:张富春(ahfuzhang),转载时请注明作者和引用链接,谢谢!
因为一个基于 fasthttp 的服务在发布上出现了一定量的 503 错误,由此怀疑 fasthttp 可能没有很好地实现 graceful shutdown.
本文将通过源码阅读的方式,推导 fasthttp 实现 graceful shutdown 的细节。
1. 业务代码中的 graceful shutdown 实现方法
func main(){ // ... // 容器退出前会先发送 SIGTERM 信号 sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) // server := &fasthttp.Server{ Handler: httphandle.FastHTTPHandler, } go func() { log.Info("Server start on", cfg.HTTP.Addr) if err := server.ListenAndServe(cfg.HTTP.Addr); err != nil { log.Fatal(err.Error()) } log.Info("Http Server Ended") }() // server.CloseOnShutdown = true <-sigs // 一旦收到信号,就开始进入退出流程 log.Info("Ready to shutdown...") // 避免缓存中的日志没有刷到磁盘 log.Close() _ = server.Shutdown() // 这一行是实现 graceful shutdown 的关键 }
2. Shutdown() 函数的实现细节
func (s *Server) ShutdownWithContext(ctx context.Context) (err error) { s.mu.Lock() defer s.mu.Unlock() // stop 标志置 1 // stop 标志主要影响 tcp 长连接中,是否在一个连接上处理多次请求和响应后 —— 如果遇到服务关停的消息,如何决定长连接上的后续行为 atomic.StoreInt32(&s.stop, 1) defer atomic.StoreInt32(&s.stop, 0) if s.ln == nil { return nil } // 所有监听的端口进行 close // 关闭监听的端口后,意味着新的 tcp 连接将不再 accept for _, ln := range s.ln { if err = ln.Close(); err != nil { return err } } // done 是一个用于信号通知的 channel // done 用于影响 RequestCtx 端的服务关停消息 // 例如:业务代码如果做很长时间的处理,可以通过 reqCtx.Done() 来判断服务是否开始关停 if s.done != nil { close(s.done) // 调用后,业务代码中就可以通过 reqCtx.Done() 知道服务开始关停了 } // Closing the listener will make Serve() call Stop on the worker pool. // Setting .stop to 1 will make serveConn() break out of its loop. // Now we just have to wait until all workers are done or timeout. ticker := time.NewTicker(time.Millisecond * 100) defer ticker.Stop() END: for { // 主动关闭空闲连接 // 只有当前没有请求和响应的空闲长连接,才会被归类到 IdleConns // 放入 idle 中的 tcp 连接只会等着被关闭,再也不会被拿出来统计 s.closeIdleConns() // open 是一个计数器,代表了活跃的 tcp 连接数 if open := atomic.LoadInt32(&s.open); open == 0 { // 如果所有的活跃的 tcp 连接都没有了,那么就退出这个等待循环。 // 如果没有任何活跃的 tcp 连接,也就意味着没有任何正在处理中的 request // 这个是 graceful shutdown 的关键判断条件 // 只有当所有请求都处理完了,才会退出服务器。满足这个条件时,必然不会存在丢失用户请求的情况。 break } // This is not an optimal solution but using a sync.WaitGroup // here causes data races as it's hard to prevent Add() to be called // while Wait() is waiting. select { case <-ctx.Done(): // 外部是否触发了信号,导致 context 变成 done (通常都是 context.Background(),不会触发这个分支) err = ctx.Err() break END case <-ticker.C: // 每 100 毫秒检查一次 continue } } s.done = nil s.ln = nil return err }
3. fasthttp 对活跃连接的处理细节
tcp 连接数的累加过程
func (s *Server) Serve(ln net.Listener) error { //... // Count our waiting to accept a connection as an open connection. // This way we can't get into any weird state where just after accepting // a connection Shutdown is called which reads open as 0 because it isn't // incremented yet. atomic.AddInt32(&s.open, 1) defer atomic.AddInt32(&s.open, -1) // 端口不再监听,才会退出下面的循环 for { // 处理新连接进入 if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { // 当 监听器 close 后,这里必然退出 wp.Stop() if err == io.EOF { return nil } return err } s.setState(c, StateNew) atomic.AddInt32(&s.open, 1) // 每当来了一个新的 tcp 连接,活跃连接数 + 1 // ... } // ... }
tcp 连接数的减少过程
func (s *Server) serveConnCleanup() { atomic.AddInt32(&s.open, -1) atomic.AddUint32(&s.concurrency, ^uint32(0)) } // 这个时候已经建立链接了 // 这个函数负责从连接上收/发数据 func (s *Server) serveConn(c net.Conn) (err error) { defer s.serveConnCleanup() // 一旦退出这个函数,活跃连接数 - 1 atomic.AddUint32(&s.concurrency, 1) // ... }
4. 阶段性总结
从以上的源码可以得到以下结论:
- 当发起服务的关停信号后,首先就是关闭监听端口。也就是说:关停期间,新的 tcp 将无法再连接进来。上游的代理服务器如果配置好,会因为 tcp 无法建立连接而转发给别的可用的后端。
- Shutdown() 在一个循环中不断检查活跃的 tcp 连接数,直到连接数为 0 才退出循环
- 这样的逻辑必然导致:所有正在处理的请求一定全部处理完成后才会退出服务。
但是也存在另一种可能:
上游的代理服务器与当前的服务建立了长连接,然后不断在这个长连接上发送请求,导致服务永远不会 shutdown,直到 k8s 环境中触发 sigkill 强制杀掉容器。
可以推断出:如果一个 fasthttp 的服务在容器关闭时是因为 sigkill 而关闭的,那么它肯定没有做到 graceful shutdown.
5. fasthttp 中对于长连接的处理细节
func (s *Server) serveConn(c net.Conn) (err error) { for { // 1. 收数据 // 2. 解析 http 头 // 3. 调用用户配置的 callback handler // 4. 检查是否有 connection close 标志 connectionClose = connectionClose || (s.MaxRequestsPerConn > 0 && connRequestNum >= uint64(s.MaxRequestsPerConn)) || // 长连接上允许的最多的往返请求数 ctx.Response.Header.ConnectionClose() || // http 协议里要求使用短连接 (s.CloseOnShutdown && atomic.LoadInt32(&s.stop) == 1) // 服务处于关停状态,且配置了 `一旦进入关停就不要再处理新连接了` 的标志 if connectionClose{ // 如果判断出这是一个需要关闭的连接,那么退出循环后就会把连接放到 Idel 中 break } // ... // 5. 检查服务器关停的标志 if atomic.LoadInt32(&s.stop) == 1 { err = nil break // 如果调用 Shutdown(), 则循环就不会再处理新的请求了 } } }
这里会存在另一种极端情况:上游在长连接中再次发来请求,因为循环中未调用 Read() 来接收数据,因此请求留在了内核的 socket buffer 里。
这种情况只有依赖上游的负载均衡来解决。
总结
从流程来看,fasthttp 的 Shutdown() 能够保障所有正在处理的请求都处理完后再关闭服务。
实际部署中可以通过检查 k8s 是否有 sigkill 来判断是否做到了 graceful shutdown.
