Netty 学习(五):服务端启动核心流程源码说明

Netty 学习(五):服务端启动核心流程源码说明

作者: Grey

原文地址:

博客园:Netty 学习(五):服务端启动核心流程源码说明

CSDN:Netty 学习(五):服务端启动核心流程源码说明

说明

本文使用的 Netty 版本是 4.1.82.Final,

        <dependency>             <groupId>io.netty</groupId>             <artifactId>netty-all</artifactId>             <version>4.1.82.Final</version>         </dependency> 

服务端在启动的时候,主要流程有如下几个

  1. 创建服务端的 Channel

  2. 初始化服务端的 Channel

  3. 注册 Selector

  4. 端口绑定

我们可以写一个简单的服务端代码,通过 Debug 的方式查看这几个关键流程的核心代码。

package source;  import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;  /**  * 代码阅读  *  * @author <a href="mailto:410486047@qq.com">Grey</a>  * @date 2022/9/12  * @since  */ public final class SimpleServer {     public static void main(String[] args) throws InterruptedException {         // EventLoopGroup: 服务端的线程模型外观类。这个线程要做的事情         // 就是不停地检测IO事件,处理IO事件,执行任务。         EventLoopGroup bossGroup = new NioEventLoopGroup();         EventLoopGroup workerGroup = new NioEventLoopGroup();         try {             // 服务端的一个启动辅助类。通过给它设置一系列参数来绑定端口启动服务。             ServerBootstrap b = new ServerBootstrap();             b                     // 设置服务端的线程模型。                     // bossGroup 负责不断接收新的连接,将新的连接交给 workerGroup 来处理。                     .group(bossGroup, workerGroup)                     // 设置服务端的 IO 类型是 NIO。Netty 通过指定 Channel 的类型来指定 IO 类型。                     .channel(NioServerSocketChannel.class)                     // 服务端启动过程中,需要经过哪些流程。                     .handler(new ChannelInboundHandlerAdapter() {                         @Override                         public void channelActive(ChannelHandlerContext ctx) {                             System.out.println("channelActive");                         }                          @Override                         public void channelRegistered(ChannelHandlerContext ctx) {                             System.out.println("channelRegistered");                         }                          @Override                         public void handlerAdded(ChannelHandlerContext ctx) {                             System.out.println("handlerAdded");                         }                     })                     // 用于设置一系列 Handler 来处理每个连接的数据                     .childHandler(new ChannelInitializer<SocketChannel>() {                         @Override                         protected void initChannel(SocketChannel socketChannel) {                          }                     });             // 绑定端口同步等待。等服务端启动完毕,才会进入下一行代码             ChannelFuture f = b.bind(8888).sync();             // 等待服务端关闭端口绑定,这里的作用是让程序不会退出             f.channel().closeFuture().sync();         } finally {             bossGroup.shutdownGracefully();             workerGroup.shutdownGracefully();         }     } } 

通过

ChannelFuture f = b.bind(8888).sync(); 

bind方法,进入源码进行查看。

首先,进入的是AbstractBootstrap中,调用的最关键的方法是如下两个:

……     private ChannelFuture doBind(final SocketAddress localAddress) {         ……         final ChannelFuture regFuture = initAndRegister();         ……         doBind0(regFuture, channel, localAddress, promise);         ……     } …… 

进入initAndResgister()方法中

……     final ChannelFuture initAndRegister() {         ……         // channel 的新建         channel = channelFactory.newChannel();         // channel 的初始化         init(channel);         ……     } …… 

这里完成了 Channel 的新建和初始化,Debug 进去,发现channelFactory.newChannel()实际上是调用了ReflectiveChannelFactorynewChannel方法,

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { ……     private final Constructor<? extends T> constructor;      public ReflectiveChannelFactory(Class<? extends T> clazz) {         ……         this.constructor = clazz.getConstructor();         ……     }      @Override     public T newChannel() {         ……         return constructor.newInstance();         ……     } …… }  

这里调用了反射方法,其实就是将服务端代码中的这一行.channel(NioServerSocketChannel.class)中的NioServerSocketChannel.class传入进行对象创建,创建一个NioServerSocketChannel实例。

在创建NioServerSocketChannel的时候,调用了NioServerSocketChannel的构造方法,构造方法的主要逻辑如下

……     public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {         this(newChannel(provider, family));     }     public NioServerSocketChannel(ServerSocketChannel channel) {         super(null, channel, SelectionKey.OP_ACCEPT);         config = new NioServerSocketChannelConfig(this, javaChannel().socket());     }     private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {         ……             ServerSocketChannel channel =                     SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);             return channel == null ? provider.openServerSocketChannel() : channel;         ……     } …… 

其中provider.openServerSocketChannel()就是调用底层 JDK 的 API,获取了 JDK 底层的java.nio.channels.ServerSocketChannel

通过super(null, channel, SelectionKey.OP_ACCEPT);一路跟踪进去,进入AbstractNioChannel中,

   protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {         super(parent);         ……         ch.configureBlocking(false);         ……     } 

关键代码是ch.configureBlocking(false),设置 I/O 模型为非阻塞模式。

通过super(parent)跟踪上去,

    protected AbstractChannel(Channel parent) {         this.parent = parent;         id = newId();         unsafe = newUnsafe();         pipeline = newChannelPipeline();     } 

其中 id 是 Netty 中每条 Channel 的唯一标识。

以上就是服务端 Channel 的创建过程。

接下来是服务端 Channel 的初始化过程,回到AbstractBootstrap.initAndResgister()方法

……     final ChannelFuture initAndRegister() {         ……         // channel 的新建         channel = channelFactory.newChannel();         // channel 的初始化         init(channel);         ……     } …… 

其中的init(channel)方法就是服务端的 Channel 的初始化过程,Debug 进入,发现是调用了ServerBootstrap.init(channel)方法,

     @Override     void init(Channel channel) {         ……         // 设置一些 Channel 的属性和配置信息         ……         p.addLast(new ChannelInitializer<Channel>() {             @Override             public void initChannel(final Channel ch) {                 final ChannelPipeline pipeline = ch.pipeline();                 ChannelHandler handler = config.handler();                 if (handler != null) {                     pipeline.addLast(handler);                 }                  ch.eventLoop().execute(new Runnable() {                     @Override                     public void run() {                         pipeline.addLast(new ServerBootstrapAcceptor(                                 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                     }                 });             }         });     } 

其核心代码如上,主要用于定义服务端启动过程中需要执行哪些逻辑。主要分为两块:

  1. 一块是添加用户自定义的处理逻辑到服务端启动流程。

  2. 另一块是添加一个特殊的处理逻辑,ServerBootstrapAcceptor 是一个接入器,接受新请求,把新的请求传递给某个事件循环器。

以上就是服务端的 Channel 的初始化过程。接下来是服务端 Channel 的注册 Selector 的过程。

    @Override     protected void doRegister() throws Exception {         boolean selected = false;         for (;;) {             try {                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);                 return;             } catch (CancelledKeyException e) {                 if (!selected) {                     // Force the Selector to select now as the "canceled" SelectionKey may still be                     // cached and not removed because no Select.select(..) operation was called yet.                     eventLoop().selectNow();                     selected = true;                 } else {                     // We forced a select operation on the selector before but the SelectionKey is still cached                     // for whatever reason. JDK bug ?                     throw e;                 }             }         }     } 

在这个步骤中,我们可以看到关于 JDK 底层的操作

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 

首先拿到在前面过程中创建的 JDK 底层的 Channel,然后调用 JDK 的 register() 方法,将 this 也即 NioServerSocketChannel 对象当作 attachment 绑定到 JDK 的 Selector 上,这样后续从 Selector 拿到对应的事件之后,就可以把 Netty 领域的 Channel 拿出来。

接下来是服务端绑定端口的逻辑,见AbstractBootstrap中的doBind0方法

    private static void doBind0(             final ChannelFuture regFuture, final Channel channel,             final SocketAddress localAddress, final ChannelPromise promise) {          // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up         // the pipeline in its channelRegistered() implementation.         channel.eventLoop().execute(new Runnable() {             @Override             public void run() {                 if (regFuture.isSuccess()) {                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);                 } else {                     promise.setFailure(regFuture.cause());                 }             }         });     } 

图例

本文所有图例见:processon: Netty学习笔记

代码

hello-netty

更多内容见:Netty专栏

参考资料

跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

深度解析Netty源码

发表评论

相关文章