gRPC(Java) keepAlive机制研究

基于java gRPC 1.24.2 分析

结论

  1. gRPC keepAlive是grpc框架在应用层面连接保活的一种措施。即当grpc连接上没有业务数据时,是否发送pingpong,以保持连接活跃性,不因长时间空闲而被Server或操作系统关闭
  2. gRPC keepAlive在client与server都有,client端默认关闭(keepAliveTime为Long.MAX_VALUE), server端默认打开,keepAliveTime为2小时,即每2小时向client发送一次ping
// io.grpc.internal.GrpcUtil public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L); 
  1. KeepAlive的管理使用类io.grpc.internal.KeepAliveManager, 用于管理KeepAlive状态,ping任务调度与执行.

Client端KeepAlive

使用入口

  1. 我们在使用io.grpc框架创建grpc连接的时候,可以设置keeplive, 例如下面:
NettyChannelBuilder builder = NettyChannelBuilder.forTarget(String.format("grpc://%s", provider)) //       .usePlaintext() //       .defaultLoadBalancingPolicy(props.getBalancePolicy()) //       .maxInboundMessageSize(props.getMaxInboundMessageSize()) //       .keepAliveTime(1,TimeUnit.MINUTES)       .keepAliveWithoutCalls(true)       .keepAliveTimeout(10,TimeUnit.SECONDS)       .intercept(channelManager.getInterceptors()); // 
  1. 其中与keepAlive相关的参数有三个,keepAliveTime,keepAliveTimeout,keepAliveWithoutCalls。这三个变量有什么作用呢?
  • keepAliveTime: 表示当grpc连接没有数据传递时,多久之后开始向server发送ping packet
  • keepAliveTimeout: 表示当发送完ping packet后多久没收到server回应算超时
  • keepAliveTimeoutCalls: 表示如果grpc连接没有数据传递时,是否keepAlive,默认为false

简要时序列表

Create & Start

NettyChannelBuilder    -----> NettyTransportFactory    ---------> NettyClientTransport    -------------> KeepAliveManager & NettyClientHandler 

响应各种事件
当Active、Idle、DataReceived、Started、Termination事件发生时,更改KeepAlive状态,调度发送ping任务。

Server端KeepAlive

使用入口

// 只截取关键代码,详细代码请看`NettyServerBuilder` ServerImpl server = new ServerImpl(     this,     buildTransportServers(getTracerFactories()),     Context.ROOT); for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {   notifyTarget.notifyOnBuild(server); } return server;  // 在buildTransportServers方法中创建NettyServer List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size()); for (SocketAddress listenAddress : listenAddresses) {   NettyServer transportServer = new NettyServer(       listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,       workerEventLoopGroupPool, negotiator, streamTracerFactories,       getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,       maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,       maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,       permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());   transportServers.add(transportServer); }  

简要时序列表

Create & Start

NettyServerBuilder     ---> NettyServer     ---------> NettyServerTransport     -------------> NettyServerHandler     -----------------> KeepAliveEnforcer 

连接准备就绪
调用 io.netty.channel.ChannelHandler的handlerAdded方法,关于此方法的描述:

Gets called after the ChannelHandler was added to the actual context and it's ready to handle events. NettyServerHandler(handlerAdded)    ---> 创建KeepAliveManager对象 

响应各种事件
同Client

KeepAliveEnforcer

在上面Server端的简要时序图中,可以看见,server端有一个特有的io.grpc.netty.KeepAliveEnforcer
此类的作用是监控clinet ping的频率,以确保其在一个合理范围内。

package io.grpc.netty;  import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.concurrent.TimeUnit; import javax.annotation.CheckReturnValue;  /** Monitors the client's PING usage to make sure the rate is permitted. */ class KeepAliveEnforcer {   @VisibleForTesting   static final int MAX_PING_STRIKES = 2;   @VisibleForTesting   static final long IMPLICIT_PERMIT_TIME_NANOS = TimeUnit.HOURS.toNanos(2);    private final boolean permitWithoutCalls;   private final long minTimeNanos;   private final Ticker ticker;   private final long epoch;    private long lastValidPingTime;   private boolean hasOutstandingCalls;   private int pingStrikes;    public KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit) {     this(permitWithoutCalls, minTime, unit, SystemTicker.INSTANCE);   }    @VisibleForTesting   KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit, Ticker ticker) {     Preconditions.checkArgument(minTime >= 0, "minTime must be non-negative");      this.permitWithoutCalls = permitWithoutCalls;     this.minTimeNanos = Math.min(unit.toNanos(minTime), IMPLICIT_PERMIT_TIME_NANOS);     this.ticker = ticker;     this.epoch = ticker.nanoTime();     lastValidPingTime = epoch;   }    /** Returns {@code false} when client is misbehaving and should be disconnected. */   @CheckReturnValue   public boolean pingAcceptable() {     long now = ticker.nanoTime();     boolean valid;     if (!hasOutstandingCalls && !permitWithoutCalls) {       valid = compareNanos(lastValidPingTime + IMPLICIT_PERMIT_TIME_NANOS, now) <= 0;     } else {       valid = compareNanos(lastValidPingTime + minTimeNanos, now) <= 0;     }     if (!valid) {       pingStrikes++;       return !(pingStrikes > MAX_PING_STRIKES);     } else {       lastValidPingTime = now;       return true;     }   }    /**    * Reset any counters because PINGs are allowed in response to something sent. Typically called    * when sending HEADERS and DATA frames.    */   public void resetCounters() {     lastValidPingTime = epoch;     pingStrikes = 0;   }    /** There are outstanding RPCs on the transport. */   public void onTransportActive() {     hasOutstandingCalls = true;   }    /** There are no outstanding RPCs on the transport. */   public void onTransportIdle() {     hasOutstandingCalls = false;   }    /**    * Positive when time1 is greater; negative when time2 is greater; 0 when equal. It is important    * to use something like this instead of directly comparing nano times. See {@link    * System#nanoTime}.    */   private static long compareNanos(long time1, long time2) {     // Possibility of overflow/underflow is on purpose and necessary for correctness     return time1 - time2;   }    @VisibleForTesting   interface Ticker {     long nanoTime();   }    @VisibleForTesting   static class SystemTicker implements Ticker {     public static final SystemTicker INSTANCE = new SystemTicker();      @Override     public long nanoTime() {       return System.nanoTime();     }   } }  
  1. 先来看pingAcceptable方法,此方法是判断是否接受client ping。
  • lastValidPingTime是上次client valid ping的时间, 连接建立时此时间等于KeepAliveEnforcer对象创建的时间。当client ping有效时,其等于当时ping的时间
  • hasOutstandingCalls其初始值为false,当连接activie时,其值为true,当连接idle时,其值为false。如果grpc调用为阻塞时调用,则调用时连接变为active,调用完成,连接变为idle.
  • permitWithoutCalls其值是创建NettyServer时传入,默认为false.
  • IMPLICIT_PERMIT_TIME_NANOS其值为常量,2h
  • minTimeNanos其值是创建NettyServer时传入,默认为5min.
  • MAX_PING_STRIKES其值为常量2
  1. resetCounters方法是当grpc当中有数据时会被调用,即有grpc调用时lastValidPingTime和pingStrikes会被重置。
  2. 如果client要想使用keepAlive,permitWithoutCalls值需要设置为true,而且cient keepAliveTime需要>=minTimeNanos
发表评论

评论已关闭。

相关文章