基于java gRPC 1.24.2 分析
结论
- gRPC keepAlive是grpc框架在应用层面连接保活的一种措施。即当grpc连接上没有业务数据时,是否发送pingpong,以保持连接活跃性,不因长时间空闲而被Server或操作系统关闭
- gRPC keepAlive在client与server都有,client端默认关闭(keepAliveTime为Long.MAX_VALUE), server端默认打开,keepAliveTime为2小时,即每2小时向client发送一次ping
// io.grpc.internal.GrpcUtil public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
- KeepAlive的管理使用类
io.grpc.internal.KeepAliveManager, 用于管理KeepAlive状态,ping任务调度与执行.
Client端KeepAlive
使用入口
- 我们在使用io.grpc框架创建grpc连接的时候,可以设置keeplive, 例如下面:
NettyChannelBuilder builder = NettyChannelBuilder.forTarget(String.format("grpc://%s", provider)) // .usePlaintext() // .defaultLoadBalancingPolicy(props.getBalancePolicy()) // .maxInboundMessageSize(props.getMaxInboundMessageSize()) // .keepAliveTime(1,TimeUnit.MINUTES) .keepAliveWithoutCalls(true) .keepAliveTimeout(10,TimeUnit.SECONDS) .intercept(channelManager.getInterceptors()); //
- 其中与keepAlive相关的参数有三个,
keepAliveTime,keepAliveTimeout,keepAliveWithoutCalls。这三个变量有什么作用呢?
- keepAliveTime: 表示当grpc连接没有数据传递时,多久之后开始向server发送ping packet
- keepAliveTimeout: 表示当发送完ping packet后多久没收到server回应算超时
- keepAliveTimeoutCalls: 表示如果grpc连接没有数据传递时,是否keepAlive,默认为false
简要时序列表
Create & Start
NettyChannelBuilder -----> NettyTransportFactory ---------> NettyClientTransport -------------> KeepAliveManager & NettyClientHandler
响应各种事件
当Active、Idle、DataReceived、Started、Termination事件发生时,更改KeepAlive状态,调度发送ping任务。
Server端KeepAlive
使用入口
// 只截取关键代码,详细代码请看`NettyServerBuilder` ServerImpl server = new ServerImpl( this, buildTransportServers(getTracerFactories()), Context.ROOT); for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) { notifyTarget.notifyOnBuild(server); } return server; // 在buildTransportServers方法中创建NettyServer List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size()); for (SocketAddress listenAddress : listenAddresses) { NettyServer transportServer = new NettyServer( listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool, workerEventLoopGroupPool, negotiator, streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz()); transportServers.add(transportServer); }
简要时序列表
Create & Start
NettyServerBuilder ---> NettyServer ---------> NettyServerTransport -------------> NettyServerHandler -----------------> KeepAliveEnforcer
连接准备就绪
调用 io.netty.channel.ChannelHandler的handlerAdded方法,关于此方法的描述:
Gets called after the ChannelHandler was added to the actual context and it's ready to handle events. NettyServerHandler(handlerAdded) ---> 创建KeepAliveManager对象
响应各种事件
同Client
KeepAliveEnforcer
在上面Server端的简要时序图中,可以看见,server端有一个特有的io.grpc.netty.KeepAliveEnforcer类
此类的作用是监控clinet ping的频率,以确保其在一个合理范围内。
package io.grpc.netty; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.concurrent.TimeUnit; import javax.annotation.CheckReturnValue; /** Monitors the client's PING usage to make sure the rate is permitted. */ class KeepAliveEnforcer { @VisibleForTesting static final int MAX_PING_STRIKES = 2; @VisibleForTesting static final long IMPLICIT_PERMIT_TIME_NANOS = TimeUnit.HOURS.toNanos(2); private final boolean permitWithoutCalls; private final long minTimeNanos; private final Ticker ticker; private final long epoch; private long lastValidPingTime; private boolean hasOutstandingCalls; private int pingStrikes; public KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit) { this(permitWithoutCalls, minTime, unit, SystemTicker.INSTANCE); } @VisibleForTesting KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit, Ticker ticker) { Preconditions.checkArgument(minTime >= 0, "minTime must be non-negative"); this.permitWithoutCalls = permitWithoutCalls; this.minTimeNanos = Math.min(unit.toNanos(minTime), IMPLICIT_PERMIT_TIME_NANOS); this.ticker = ticker; this.epoch = ticker.nanoTime(); lastValidPingTime = epoch; } /** Returns {@code false} when client is misbehaving and should be disconnected. */ @CheckReturnValue public boolean pingAcceptable() { long now = ticker.nanoTime(); boolean valid; if (!hasOutstandingCalls && !permitWithoutCalls) { valid = compareNanos(lastValidPingTime + IMPLICIT_PERMIT_TIME_NANOS, now) <= 0; } else { valid = compareNanos(lastValidPingTime + minTimeNanos, now) <= 0; } if (!valid) { pingStrikes++; return !(pingStrikes > MAX_PING_STRIKES); } else { lastValidPingTime = now; return true; } } /** * Reset any counters because PINGs are allowed in response to something sent. Typically called * when sending HEADERS and DATA frames. */ public void resetCounters() { lastValidPingTime = epoch; pingStrikes = 0; } /** There are outstanding RPCs on the transport. */ public void onTransportActive() { hasOutstandingCalls = true; } /** There are no outstanding RPCs on the transport. */ public void onTransportIdle() { hasOutstandingCalls = false; } /** * Positive when time1 is greater; negative when time2 is greater; 0 when equal. It is important * to use something like this instead of directly comparing nano times. See {@link * System#nanoTime}. */ private static long compareNanos(long time1, long time2) { // Possibility of overflow/underflow is on purpose and necessary for correctness return time1 - time2; } @VisibleForTesting interface Ticker { long nanoTime(); } @VisibleForTesting static class SystemTicker implements Ticker { public static final SystemTicker INSTANCE = new SystemTicker(); @Override public long nanoTime() { return System.nanoTime(); } } }
- 先来看
pingAcceptable方法,此方法是判断是否接受client ping。
lastValidPingTime是上次client valid ping的时间, 连接建立时此时间等于KeepAliveEnforcer对象创建的时间。当client ping有效时,其等于当时ping的时间hasOutstandingCalls其初始值为false,当连接activie时,其值为true,当连接idle时,其值为false。如果grpc调用为阻塞时调用,则调用时连接变为active,调用完成,连接变为idle.permitWithoutCalls其值是创建NettyServer时传入,默认为false.IMPLICIT_PERMIT_TIME_NANOS其值为常量,2hminTimeNanos其值是创建NettyServer时传入,默认为5min.MAX_PING_STRIKES其值为常量2
resetCounters方法是当grpc当中有数据时会被调用,即有grpc调用时lastValidPingTime和pingStrikes会被重置。- 如果client要想使用keepAlive,
permitWithoutCalls值需要设置为true,而且cient keepAliveTime需要>=minTimeNanos