Netty源码—1.服务端启动流程

大纲

1.服务端启动整体流程及关键方法

2.服务端启动的核心步骤

3.创建服务端Channel的源码

4.初始化服务端Channel的源码

5.注册服务端Channel的源码

6.绑定服务端端口的源码

7.服务端启动流程源码总结

 

1.服务端启动整体流程及关键方法

(1)关键方法

(2)整体流程

 

(1)关键方法

一.EventLoopGroup

服务端的线程模型外观类,Netty的线程模型是事件驱动的。也就是说,这个线程要做的事情就是不停地检测IO事件、处理IO事件、执行任务,并且不断重复这三个步骤。

 

二.ServerBootstrap

服务端的一个启动辅助类,通过给它设置一系列参数来绑定端口启动服务。

 

三.group(bossGroup, workerGroup)

设置服务端的线程模型,bossGroup的作用就是不断接收新的连接,并将新连接交给workerGroup来进行处理。

 

四.channel(NioServerSocketChannel.class)

设置服务端的IO类型为NIO,Netty是通过指定Channel的类型来指定IO类型的。Channel是Netty的一大组件,一个Channel就是一个连接或者一个服务端的bind动作。

 

五.handler()

表示在服务端的启动过程中,需要经过哪些流程。

 

六.childHandler()

设置ChannleHandler来处理每个连接上的数据。

 

七.ChannelFuture f = b.bind(8888).sync()

绑定端口并进行同步等待。绑定端口8888,等服务端启动完毕,才会进入下一行代码。

 

八.f.channel().closeFuture().sync()

等待服务端关闭端口绑定,这里的作用其实就是让程序不会退出。

 

九.bossGroup.shutdownGracefully()

关闭事件循环,关闭之后,main方法就结束了。

 

(2)整体流程

一.创建ServerBootstrap实例

ServerBootstrap是Netty服务端的启动辅助类,它提供了一系列方法用于设置服务端启动相关的参数。底层通过门面模式对各种能力进行抽象和封装,以让用户少和底层API交互,降低开发难度。ServerBootstrap只有一个无参的构造函数,它使用了Builder模式来处理参数过多的问题。

 

二.设置并绑定Reactor线程池

Netty的Reactor线程池是EventLoopGroup,而EventLoopGroup实际就是EventLoop的数组。EventLoop的职责是处理所有注册到本线程多路复用器Selector上的Channel。Selector的轮询操作是由其绑定的EventLoop线程run()方法驱动的,在一个循环体内循环执行。

 

三.设置并绑定服务端Channel

由于NIO服务端需要创建ServerSocketChannel,而Netty对NIO类库进行了封装,所以对应的就是NioServerSocketChannel。

 

Netty的ServerBootstrap方法提供了channel()方法用于指定服务端的Channel类型。Netty是通过工厂类(ServerBootstrap的父类AbstractBootstrap的ReflectiveChannelFactory实例),利用反射创建NioServerSocketChannel对象的。由于启动时才调用,所以该反射对运行时的性能没有影响。

 

四.创建并初始化ChannelPipeline

ChannelPipeline不是NIO服务端必需的,它本质是一个负责处理网络事件的职责链。ChannelPipeline这个职责链会负责管理和执行ChannelHandler。网络事件以事件流的形式在ChannelPipeline中流转,由ChannelPipeline根据ChannelHandler的执行策略来调度执行。

 

五.添加并设置ChannelHandler

ChannelHandler是Netty提供给用户定制和扩展的关键接口。利用ChannelHandler用户可以完成大多数的功能定制,如消息编解码、心跳、安全认证、流量控制和流量整形。

 

六.绑定并启动监听端口

在绑定监听端口之前,系统会做一系列的初始化和检测工作。完成之后便会启动监听端口,并将ServerSocketChannel注册到Selector上,然后监听客户端连接。

 

七.Selector轮询

由Reactor线程NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合。

 

八.执行ChannelPipeline和ChannelHandler

当轮询到准备就绪的Channel之后,就由Reactor线程NioEventLoop执行ChannelPipeline的相应方法。即ChannelPipeline会根据网络事件的类型调度并执行ChannelHandler,最终执行Netty自带的ChannelHandler或用户定制的ChannelHandler。

 

典型的网络事件有:

一.channelRegistered() 链路注册

二.channelActive() 链路激活

三.channelInActive() 链路断开

四.channelRead() 接收到请求消息

五.channelReadComplete() 处理完请求消息

六.exceptionCaugh() 链路发生异常

 

常用的ChannelHandler有:

一.ByteToMessageCodec 消息编解码Handler

二.LoggingHandler 码流日志打印Handler

三.SslHandler SSL安全认证Handler

四.IdleStateHandler 链路空闲检测Handler

五.LengthFieldBasedFrameDecoder 基于长度域的半包解码Handler

六.ChannelTrafficShapingHandler 进行流量整形的Handler

七.Base64Decoder和Base64Encoder Base64编解码Handler

 

2.服务端启动的核心步骤

(1)由启动辅助类的外观接口实现启动

(2)启动辅助类的bind()方法

(3)启动辅助类的initAndRegister()方法

(4)服务端启动的4个核心步骤

 

(1)由启动辅助类的外观接口实现启动

用户给启动辅助类ServerBootstrap设置好参数后,会通过它的外观接口来实现启动。

b.bind(8888).sync();

(2)启动辅助类的bind()方法

ServerBootstrap的bind()方法如下,来自其继承的抽象类AbstractBootstrap。

//Bootstrap sub-class which allows easy bootstrap of ServerChannel public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {     ...     ... }  //AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.  //It support method-chaining to provide an easy way to configure the AbstractBootstrap. //When not used in a ServerBootstrap context, the #bind() methods are useful for connectionless transports such as datagram (UDP). public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {     ...     //Create a new Channel and bind it.     public ChannelFuture bind(int inetPort) {         //首先根据端口号创建一个InetSocketAddress对象,然后调用重载方法bind()         return bind(new InetSocketAddress(inetPort));     }          //Create a new Channel and bind it.     public ChannelFuture bind(SocketAddress localAddress) {         //验证服务启动需要的必要参数         validate();         if (localAddress == null) throw new NullPointerException("localAddress");         return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));     }          private ChannelFuture doBind(final SocketAddress localAddress) {         final ChannelFuture regFuture = initAndRegister();//1.初始化和注册Channel         final Channel channel = regFuture.channel();         ...         doBind0(regFuture, channel, localAddress, promise);//2.绑定服务端端口         ...         return promise;     }     ... }

通过传入端口号调用AbstractBootstrap的bind()方法时,首先会根据端口号创建一个InetSocketAddress对象,然后继续调用重载方法bind()。重载方法bind()会先通过validate()方法验证服务启动需要的必要参数,然后调用doBind()方法。doBind()方法中的核心方法是:initAndRegister() + doBind0()。前者用于初始化和注册Channel,后者用于绑定服务端端口。

 

(3)启动辅助类的initAndRegister()方法

//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.  //It support method-chaining to provide an easy way to configure the AbstractBootstrap. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {     ...     final ChannelFuture initAndRegister() {         Channel channel = null;         ...         //1.创建服务端Channel         channel = channelFactory.newChannel();         //2.初始化服务端Channel         init(channel);         ...         //3.注册服务端Channel,比如通过NioEventLoopGroup的register()方法进行注册         ChannelFuture regFuture = config().group().register(channel);         ...         return regFuture;     }     ... }

(4)服务端启动的4个核心步骤

步骤一:创建服务端Channel

步骤二:初始化服务端Channel

步骤三:注册服务端Channel到Selector

步骤四:绑定服务端端口

 

3.创建服务端Channel的源码

(1)Channel的概念

(2)Channel的创建

(3)ChannelFactory的创建

(4)通过反射创建Channel对象

(5)创建JDK底层NIO的Channel

(6)创建Channel配置类

(7)设置Channel类型为非阻塞

(8创建Channel的核心组件

(9)创建服务端Channel总结

 

(1)Channel的概念

Netty官方对Channel的描述是:Channel可以理解为一个网络连接或者一个具有"读、写、连接、绑定"等IO操作能力的组件。Netty的Channel由于是在服务启动的时候创建的,可以和BIO中的ServerSocket对应,也和NIO中的ServerSocketChannel对应,所以符合上述IO组件的概念。

 

(2)Channel的创建

Channel是通过ChannelFactory的newChannel()方法创建出来的。

//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.  //It support method-chaining to provide an easy way to configure the AbstractBootstrap. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {     private volatile ChannelFactory<? extends C> channelFactory;     ...     final ChannelFuture initAndRegister() {         Channel channel = null;         ...         //1.创建服务端Channel         channel = channelFactory.newChannel();         //2.初始化服务端Channel         init(channel);         ...         //3.注册服务端Channel,比如通过NioEventLoopGroup的register()方法进行注册         ChannelFuture regFuture = config().group().register(channel);         ...         return regFuture;     }     ... }  public interface ChannelFactory<T extends Channel> {     //Creates a new channel.     T newChannel(); }

(3)ChannelFactory的创建

ChannelFactory是通过AbstractBootstrap的channel()方法创建出来的。

 

用户在调用启动辅助类的channel()方法时会将NioServerSocketChannel.class,作为ReflectiveChannelFactory的构造方法的参数,从而创建出一个ReflectiveChannelFactory对象,也就是ChannelFactory对象。

public class NettyServer {     ...     public void start() throws Exception {         ...         ServerBootstrap serverBootstrap = new ServerBootstrap();         serverBootstrap.group(bossGroup, workerGroup)         .channel(NioServerSocketChannel.class)//监听端口的ServerSocketChannel         .option(ChannelOption.SO_BACKLOG, 128)         .childOption(ChannelOption.SO_KEEPALIVE, true)         .childHandler(new ChannelInitializer<SocketChannel>() {//处理每个客户端连接的SocketChannel             @Override             protected void initChannel(SocketChannel socketChannel) throws Exception {                 ...             }         });         ChannelFuture channelFuture = serverBootstrap.bind(port).sync();//同步等待启动服务器监控端口         channelFuture.channel().closeFuture().sync();//同步等待关闭启动服务器的结果         ...     } }  //AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.  //It support method-chaining to provide an easy way to configure the AbstractBootstrap. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {     private volatile ChannelFactory<? extends C> channelFactory;     ...     //The Class which is used to create Channel instances from.     //You either use this or #channelFactory(io.netty.channel.ChannelFactory) if your Channel implementation has no no-args constructor.     public B channel(Class<? extends C> channelClass) {         if (channelClass == null) throw new NullPointerException("channelClass");         return channelFactory(new ReflectiveChannelFactory<C>(channelClass));     }          @SuppressWarnings({ "unchecked", "deprecation" })     public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {         return channelFactory((ChannelFactory<C>) channelFactory);     }          public B channelFactory(ChannelFactory<? extends C> channelFactory) {         if (channelFactory == null) throw new NullPointerException("channelFactory");         if (this.channelFactory != null) throw new IllegalStateException("channelFactory set already");         this.channelFactory = channelFactory;         return (B) this;     }     ... }

(4)通过反射创建Channel对象

AbstractBootstrap.initAndRegister()方法中的channelFactory.newChannel()代码,最终调用的是ReflectiveChannelFactory.newChannel()方法,该方法会通过反射的方式创建出一个NioServerSocketChannel对象。

 

所以最终创建的服务端Channel相当于调用NioServerSocketChannel的默认构造函数来获得一个NioServerSocketChannel对象。

//A ChannelFactory that instantiates a new Channel by invoking its default constructor reflectively. public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {     private final Class<? extends T> clazz;     public ReflectiveChannelFactory(Class<? extends T> clazz) {         if (clazz == null) throw new NullPointerException("clazz");         this.clazz = clazz;     }      @Override     public T newChannel() {         ...         return clazz.newInstance();     } }

(5)创建JDK底层NIO的Channel

NioServerSocketChannel的默认构造方法会调用其newSocket()方法,而newSocket()方法中会通过SelectorProvider.openServerSocketChannel()方法创建一个ServerSocketChannel对象。这个对象也就是JDK底层的Channel,即NIO的Socket。

//A io.netty.channel.socket.ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections. public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {     private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();     private final ServerSocketChannelConfig config;     ...     //Create a new instance,默认的构造方法     public NioServerSocketChannel() {         //调用newSocket()创建出一个ServerSocketChannel对象后,再调用有参构造方法         this(newSocket(DEFAULT_SELECTOR_PROVIDER));     }          private static ServerSocketChannel newSocket(SelectorProvider provider) {         ...         //创建一个ServerSocketChannel对象         //这个对象也就是JDK底层的Channel,即NIO的Socket         return provider.openServerSocketChannel();     }          //Create a new instance using the given ServerSocketChannel,重载的构造方法     public NioServerSocketChannel(ServerSocketChannel channel) {         //传入要关心的ACCEPT事件         super(null, channel, SelectionKey.OP_ACCEPT);         this.config = new NioServerSocketChannelConfig(this, javaChannel().socket());     }     ... }

(6)创建Channel配置类

NioServerSocketChannel的默认构造方法还会调用其重载的构造方法;在重载的构造方法里,会创建一个NioServerSocketChannelConfig对象,其顶层接口为ChannelConfig。

 

(7)设置Channel类型为非阻塞

在NioServerSocketChannel的重载构造方法里,会逐层调用父类的构造方法,比如其中就会调用AbstractNioChannel的构造方法。

Netty源码—1.服务端启动流程

在AbstractNioChannel的构造方法中,会将前面provider.openServerSocketChannel()创建的ServerSocketChannel对象,保存到AbstractNioChannel的成员变量ch中,然后再将该Channel对象设置为非阻塞模式。以及将NioServerSocketChannel重载构造方法里传入的SelectionKey.OP_ACCEPT,设置到AbstractNioChannel的另一成员变量readInterestOp中,表示该Channel对象要关心ACCEPT事件。

 

注意:可以通过javaChannel()方法获取AbstractNioChannel的成员变量ch。

//A io.netty.channel.socket.ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections. public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {     private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();     private final ServerSocketChannelConfig config;     ...     //Create a new instance using the given ServerSocketChannel,重载的构造方法     public NioServerSocketChannel(ServerSocketChannel channel) {         //传入要关心的ACCEPT事件         super(null, channel, SelectionKey.OP_ACCEPT);         this.config = new NioServerSocketChannelConfig(this, javaChannel().socket());     }     ... }  //AbstractNioChannel} base class for Channels that operate on messages. public abstract class AbstractNioMessageChannel extends AbstractNioChannel {     ...     protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {         super(parent, ch, readInterestOp);     }     ... }  //Abstract base class for Channel implementations which use a Selector based approach. public abstract class AbstractNioChannel extends AbstractChannel {     private final SelectableChannel ch;//这是NIO中的Channel     protected final int readInterestOp;     ...     //Create a new instance     //@param parent,the parent Channel by which this instance was created. May be null.     //@param ch,he underlying SelectableChannel on which it operates     //@param readInterestOp,the ops to set to receive data from the SelectableChannel     protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {         super(parent);         //NioServerSocketChannel.newSocket()方法通过JDK底层创建的Channel对象会被缓存在其父类AbstractNioChannel的变量ch中         //可以通过NioServerSocketChannel.javaChannel()方法获取其父类AbstractNioChannel的变量ch         this.ch = ch;         this.readInterestOp = readInterestOp;         ...         //设置Channel对象为非阻塞模式         ch.configureBlocking(false);         ...     }          protected SelectableChannel javaChannel() {         return ch;     }     ... }

(8创建Channel的核心组件

AbstractNioChannel的构造方法中还会调用其父类AbstractChannel的构造方法。在AbstractChannel的构造方法中,Netty创建了三大组件,分别赋值到其成员变量中。第一个组件是ChannelId,第二个组件是Unsafe,第三个组件是ChannelPipeline。

//A skeletal Channel implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {     private final Channel parent;     private final ChannelId id;     private final Unsafe unsafe;     private final DefaultChannelPipeline pipeline;     ...     //Creates a new instance.     //@param parent,the parent of this channel. null if there's no parent.     protected AbstractChannel(Channel parent) {         this.parent = parent;         this.id = newId();         this.unsafe = newUnsafe();         this.pipeline = newChannelPipeline();     }     ... }

(9)创建服务端Channel总结

用户调用启动辅助类ServerBootstrap的bind()方法时,第一步通过反射创建服务端Channel会执行NioServerSocketChannel的默认构造方法,来创建一个NioServerSocketChannel对象,并且在创建过程中会创建Netty的一系列核心组件:如Channel、ChannelConfig、ChannelId、Unsafe、ChannelPipeline。

 

创建服务端Channel的关键脉络如下:

ServerBootstrap.bind() //用户代码入口   AbstractBootstrap.initAndRegister() //初始化并注册Channel     channelFactory.newChannel() //创建服务端Channel       NioServerSocketChannel.newSocket() //通过JDK来创建JDK底层NIO的Channel       new NioServerSocketChannelConfig() //对底层NIO的Channel进行TCP参数配置       new AbstractNioChannel() //调用AbstractNioChannel的构造方法         configureBlocking(false) //设置NIO的Channel为非阻塞模式         new AbstractChannel() //调用AbstractChannel的构造方法创建组件:id、unsafe、pipeline

 

4.初始化服务端Channel的源码

(1)初始化服务端Channel的时机

(2)初始化服务端Channel的三项工作

(3)初始化服务端Channel总结

 

(1)初始化服务端Channel的时机

AbstractBootstrap的initAndRegister()方法执行channelFactory.newChannel()创建服务端Channel后,便会继续执行init(channel)对服务端Channel进行初始化。

//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.  //It support method-chaining to provide an easy way to configure the AbstractBootstrap. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {     ...     final ChannelFuture initAndRegister() {         Channel channel = null;         ...         //1.创建服务端Channel         channel = channelFactory.newChannel();         //2.初始化服务端Channel         init(channel);         ...         //3.注册服务端Channel,比如通过NioEventLoopGroup的register()方法进行注册         ChannelFuture regFuture = config().group().register(channel);         ...         return regFuture;     }          //init()方法的具体逻辑会由ServerBootstrap来实现     abstract void init(Channel channel) throws Exception;     ... }

(2)初始化服务端Channel的三项工作

AbstractBootstrap的init()方法只是一个抽象方法,具体的逻辑会在ServerBootstrap类中实现。

 

ServerBootstrap的init()方法初始化服务端Channel主要有三项工作:

一.设置服务端Channel的Option与Attr

二.设置客户端Channel的Option与Attr

三.配置服务端启动逻辑

 

其中,Netty把服务端启动过程中需要执行的启动逻辑分为两部分。一部分是添加用户自定义的处理逻辑到服务端启动流程,另一部分是添加一个特殊的逻辑处理ServerBootstrapAcceptor。

 

ServerBootstrapAcceptor是一个接入器,用来接收新请求以及把新请求传递给某个事件循环器。

//Bootstrap sub-class which allows easy bootstrap of ServerChannel public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {     ...     @Override     void init(Channel channel) throws Exception {         //1.设置服务端Channel的Option与Attr         final Map<ChannelOption<?>, Object> options = options0();         synchronized (options) {             channel.config().setOptions(options);         }         final Map<AttributeKey<?>, Object> attrs = attrs0();         synchronized (attrs) {             for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {                 @SuppressWarnings("unchecked")                 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();                 channel.attr(key).set(e.getValue());             }         }                 //2.设置客户端Channel的Option与Attr         final EventLoopGroup currentChildGroup = childGroup;         final ChannelHandler currentChildHandler = childHandler;         final Entry<ChannelOption<?>, Object>[] currentChildOptions;         final Entry<AttributeKey<?>, Object>[] currentChildAttrs;         synchronized (childOptions) {             currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));         }         synchronized (childAttrs) {             currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));         }                 //3.配置服务端启动逻辑         ChannelPipeline p = channel.pipeline();         //p.addLast()用于定义服务端启动过程中需要执行哪些逻辑         p.addLast(new ChannelInitializer<Channel>() {             @Override             public void initChannel(Channel ch) throws Exception {                 //一.添加用户自定义的Handler,注意这是handler,而不是childHandler                 final ChannelPipeline pipeline = ch.pipeline();                 ChannelHandler handler = config.handler();                 if (handler != null) pipeline.addLast(handler);                 //二.添加一个特殊的Handler用于接收新连接                 //自定义的childHandler会作为参数传入连接器ServerBootstrapAcceptor                 ch.eventLoop().execute(new Runnable() {                     @Override                     public void run() {                         pipeline.addLast(new ServerBootstrapAcceptor(                             currentChildGroup,                              currentChildHandler,                              currentChildOptions,                              currentChildAttrs)                         );                     }                 });             }         });     }     ... }

(3)初始化服务端Channel总结

AbstractBootstrap的initAndRegister()方法调用的ServerBootstrap实现的init()方法并没有启动服务,只是初始化一些基本配置和属性,以及在服务端启动逻辑中加入一个ServerBootstrapAcceptor接入器,用来专门接收新连接。

ServerBootstrap.bind() //用户代码入口   AbstractBootstrap.initAndRegister() //初始化并注册Channel     channelFactory.newChannel() //创建服务端Channel     ServerBootstrap.init() //初始化服务端Channel       setChannelOptions、setChannelAttrs //设置服务端的Option与Attr       setChildOptions、setChildAttrs //设置客户端的Option与Attr       config.handler() //设置服务端pipeline       addServerBootstrapAcceptor //添加接入器

 

5.注册服务端Channel的源码

(1)注册服务端Channel的入口

(2)注册Selector的主要步骤

(3)注册服务端Channel总结

 

(1)注册服务端Channel的入口

首先AbstractBootstrap的config()方法是一个抽象方法,会由ServerBootstrap来实现。

 

ServerBootstrap的config()方法会返回一个封装了ServerBootstrap对象的ServerBootstrapConfig对象。所以执行代码config().group()时会调用AbstractBootstrapConfig的group()方法,也就是执行ServerBootstrap的group()方法返回用户通过group()方法设置的一个NioEventLoopGroup对象。因此config().group().register(channel)最后会调用NioEventLoopGroup的register()方法。

//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.  //It support method-chaining to provide an easy way to configure the AbstractBootstrap. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {     volatile EventLoopGroup group;     ...     final ChannelFuture initAndRegister() {         Channel channel = null;         ...         //1.创建服务端Channel         channel = channelFactory.newChannel();         //2.初始化服务端Channel         init(channel);         ...         //3.注册服务端Channel并启动一个NioEventLoop线程,通过NioEventLoopGroup的register()方法进行注册         ChannelFuture regFuture = config().group().register(channel);         ...         return regFuture;     }     //Returns the AbstractBootstrapConfig object that can be used to obtain the current config of the bootstrap.     public abstract AbstractBootstrapConfig<B, C> config();     //Returns the configured EventLoopGroup or null if non is configured yet.     public final EventLoopGroup group() {         return group;     }     ... }  //Bootstrap sub-class which allows easy bootstrap of ServerChannel public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {     private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);     ...     @Override     public final ServerBootstrapConfig config() {         return config;     }     ... }  public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> {     protected final B bootstrap;     ...     protected AbstractBootstrapConfig(B bootstrap) {         this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap");     }     //Returns the configured EventLoopGroup or null if non is configured yet.     public final EventLoopGroup group() {         //比如返回一个NioEventLoopGroup对象         return bootstrap.group();     }     ... }

NioEventLoopGroup继承自抽象类MultithreadEventLoopGroup,调用NioEventLoopGroup的register()方法也就是调用MultithreadEventLoopGroup的register()方法。

 

调用NioEventLoopGroup的register()方法时,会先通过next()方法获取一个NioEventLoop对象,然后再调用NioEventLoop的register()方法。而调用NioEventLoop的register()方法,其实就是调用抽象类SingleThreadEventLoop的register()方法。

 

在SingleThreadEventLoop的register()方法中,promise.channel().unsafe()会返回一个Channel.Unsafe类型的对象。而AbstractChannel实现了Channel接口,AbstractChannel的内部类AbstractUnsafe也实现了Channel接口的内部接口Unsafe。

 

所以promise.channel().unsafe().register(this, promise)最后会调用AbstractUnsafe的register()方法。

//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels. public class NioEventLoopGroup extends MultithreadEventLoopGroup {     ...     ... }  //Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time. public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {     ...     @Override     public ChannelFuture register(Channel channel) {         //先通过next()方法获取一个NioEventLoop,然后通过NioEventLoop.register()方法注册服务端Channel         return next().register(channel);     }      @Override     public EventLoop next() {         return (EventLoop) super.next();     }     ... }  //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop. public final class NioEventLoop extends SingleThreadEventLoop {     ...     ... }  //Abstract base class for EventLoops that execute all its submitted tasks in a single thread. public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {     ...     @Override     public ChannelFuture register(Channel channel) {         return register(new DefaultChannelPromise(channel, this));     }      @Override     public ChannelFuture register(final ChannelPromise promise) {         ObjectUtil.checkNotNull(promise, "promise");         //调用AbstractUnsafe的register()方法         promise.channel().unsafe().register(this, promise);         return promise;     }     ... }

所以注册服务端Channel的关键逻辑其实就体现在AbstractUnsafe的register()方法上。该方法会先将EventLoop事件循环器绑定到服务端Channel即NioServerSocketChanel上,然后再调用AbstractUnsafe的register0()方法将服务端Channel注册到Selector上。

//A skeletal Channel implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {     private volatile EventLoop eventLoop;     ...     //Unsafe implementation which sub-classes must extend and use.     protected abstract class AbstractUnsafe implements Unsafe {         ...         @Override         public final void register(EventLoop eventLoop, final ChannelPromise promise) {             ...             //绑定事件循环器,即绑定一个NioEventLoop到该Channel上             AbstractChannel.this.eventLoop = eventLoop;             //注册Selector,并启动一个NioEventLoop             if (eventLoop.inEventLoop()) {                 register0(promise);             } else {                 ...                 //通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上                 eventLoop.execute(new Runnable() {                     @Override                     public void run() {                         register0(promise);                     }                 });                 ...             }         }          private void register0(ChannelPromise promise) {             ...         }         ...     }     ... }

注意:AbstractUnsafe的register()方法会将前面获取到的一个NioEventLoop事件循环器绑定到服务端Channel上,之后便可以通过channel.eventLoop()来取出这个NioEventLoop事件循环器了。因此,一个服务端Channel对应一个NioEventLoop事件循环器。此外,会通过启动一个NioEventLoop线程来调用register0()方法将服务端Channel注册到Selector上。

 

总结:创建服务端Channel后,就会从NioEventLoopGroup中获取一个NioEventLoop出来进行绑定,并启动这个NioEventLoop线程将这个服务端Channel注册到Selector上以及执行线程的run()逻辑监听事件等。

 

(2)注册Selector的主要步骤

AbstractUnsafe.register0()方法主要有4个步骤。

 

步骤一:调用JDK底层注册服务端Channel到Selector上

doRegister()方法是由AbstractChannel的子类AbstractNioChannel来实现的。

 

在AbstractNioChannel的doRegister()方法中,首先获取前面创建的JDK底层NIO的Channel,然后调用JDK底层NIO的register()方法,将this也就是NioServerSocketChannel对象当作attachment绑定到JDK的Selector上。这样绑定是为了后续从Selector拿到对应的事件后,可以把Netty领域的Channel拿出来。而且注册的ops值是0,表示此时还不关注任何事件。

 

步骤二:回调handlerAdded事件

步骤三:传播channelRegisterd事件

步骤四:其他逻辑

//A skeletal Channel implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {     private volatile EventLoop eventLoop;     ...     //Unsafe implementation which sub-classes must extend and use.     protected abstract class AbstractUnsafe implements Unsafe {         ...         @Override         public final void register(EventLoop eventLoop, final ChannelPromise promise) {             ...             //绑定事件循环器,即绑定一个NioEventLoop到该Channel上             AbstractChannel.this.eventLoop = eventLoop;             //注册Selector,并启动一个NioEventLoop             if (eventLoop.inEventLoop()) {                 register0(promise);             } else {                 ...                 //通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上                 eventLoop.execute(new Runnable() {                     @Override                     public void run() {                         register0(promise);                     }                 });                 ...             }         }          private void register0(ChannelPromise promise) {             ...             boolean firstRegistration = this.neverRegistered;             //1.调用JDK底层注册服务端Channel到Selector上             doRegister();             this.neverRegistered = false;             this.registered = true;             //2.回调handlerAdded事件             this.pipeline.invokeHandlerAddedIfNeeded();             safeSetSuccess(promise);             //3.传播channelRegisterd事件到用户代码里             this.pipeline.fireChannelRegistered();             //4.其他逻辑             if (isActive()) {                 if (firstRegistration) {                     this.pipeline.fireChannelActive();                 } else if (config().isAutoRead()) {                     beginRead();                 }             }             ...         }         ...     }          //Is called after the Channel is registered with its EventLoop as part of the register process.     //Sub-classes may override this method     protected void doRegister() throws Exception {         // NOOP     }     ... }  //Abstract base class for Channel implementations which use a Selector based approach. public abstract class AbstractNioChannel extends AbstractChannel {     private final SelectableChannel ch;//这是NIO中的Channel     protected final int readInterestOp;     volatile SelectionKey selectionKey;     ...     //Create a new instance     //@param parent,the parent Channel by which this instance was created. May be null.     //@param ch,he underlying SelectableChannel on which it operates     //@param readInterestOp,the ops to set to receive data from the SelectableChannel     protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {         super(parent);         //NioServerSocketChannel.newSocket()方法通过JDK底层创建的Channel对象会被缓存在其父类AbstractNioChannel的变量ch中         //可以通过NioServerSocketChannel.javaChannel()方法获取其父类AbstractNioChannel的变量ch         this.ch = ch;         this.readInterestOp = readInterestOp;         ...         //设置Channel对象为非阻塞模式         ch.configureBlocking(false);         ...     }          @Override     protected void doRegister() throws Exception {         boolean selected = false;         for (;;) {             ...             //首先获取前面创建的JDK底层NIO的Channel,然后调用JDK底层NIO的register()方法,             //将this也就是NioServerSocketChannel对象当作attachment绑定到JDK的Selector上;             //这样绑定是为了后续从Selector拿到对应的事件后,可以把Netty领域的Channel拿出来;             //而且注册的ops值是0,表示此时还不关注任何事件;             selectionKey = javaChannel().register(eventLoop().selector, 0, this);             return;             ...         }     }              protected SelectableChannel javaChannel() {         return ch;     }     ... }

(3)注册服务端Channel总结

注册服务端Channel的入口是AbstractChannel的内部类AbstractUnsafe的register()方法。

 

首先会把一个NioEventLoop线程和当前的Channel进行绑定,然后再调用AbstractUnsafe的register0()方法进行注册。而register0()方法会把前面创建的JDK底层NIO的Channel注册到Selector上,并且把Netty领域的Channel当作一个attachment绑定到Selector上去,最后回调handlerAdded事件以及传播channelRegistered事件到用户代码里。

ServerBootstrap.bind() //用户代码入口   AbstractBootstrap.initAndRegister() //初始化并注册Channel     channelFactory.newChannel() //创建服务端Channel     ServerBootstrap.init() //初始化服务端Channel     NioEventLoopGroup.register() //注册服务端Channel       NioEventLoop.register() //注册服务端Channel         AbstractChannel.AbstractUnsafe.register() //注册Channel入口           this.eventLoop = eventLoop //将Channel绑定NioEventLoop线程           AbstractChannel.AbstractUnsafe.register0() //实际注册             AbstractNioChannel.doRegister() //调用JDK底层注册Channel到Selector             invokeHandlerAddedIfNeeded() //回调handlerAdded事件             fireChannelRegistered() //传播channelRegistered事件

补充说明一:Java类是单继承的,Java接口却是多继承的。因为前者不能区分父类相同名字方法要用哪一个,后者则由于还没实现接口,即使父类有相同名字接口也不影响。

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {     ... }  public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {     ... }

补充说明二:如果监听一个端口,就创建一个服务端Channel。如果监听多个端口,就创建多个服务端Channel。

 

每个Channel绑定于NioEventLoopGroup的next()方法返回的一个NioEventLoop。

 

6.绑定服务端端口的源码

(1)绑定服务端端口的时机

(2)AbstractUnsafe.bind()方法的主要工作

(3)调用JDK底层绑定端口

(4)传播ChannelActive事件

(5)注册ACCEPT事件到Selector

(6)绑定服务端端口总结

 

(1)绑定服务端端口的时机

ServerBootstrap的bind()方法,首先执行AbstractBootstrap的initAndRegister()方法完成了服务端Channel的初始化和注册后,就会调用AbstractBootstrap的doBind0()方法绑定端口。

//Bootstrap sub-class which allows easy bootstrap of ServerChannel public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {     ...     ... }  //AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.  //It support method-chaining to provide an easy way to configure the AbstractBootstrap. //When not used in a ServerBootstrap context, the #bind() methods are useful for connectionless transports such as datagram (UDP). public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {     ...     //Create a new Channel and bind it.     public ChannelFuture bind(int inetPort) {         //首先根据端口号创建一个InetSocketAddress对象,然后调用重载方法bind()         return bind(new InetSocketAddress(inetPort));     }          //Create a new Channel and bind it.     public ChannelFuture bind(SocketAddress localAddress) {         //验证服务启动需要的必要参数         validate();         if (localAddress == null) throw new NullPointerException("localAddress");         return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));     }          private ChannelFuture doBind(final SocketAddress localAddress) {         final ChannelFuture regFuture = initAndRegister();//1.初始化和注册Channel         final Channel channel = regFuture.channel();         ...         doBind0(regFuture, channel, localAddress, promise);//2.绑定服务端端口         ...         return promise;     }          private static void doBind0(final ChannelFuture regFuture, final Channel channel,         final SocketAddress localAddress, final ChannelPromise promise) {          //This method is invoked before channelRegistered() is triggered.         //Give user handlers a chance to set up the pipeline in its channelRegistered() implementation.         channel.eventLoop().execute(new Runnable() {             @Override             public void run() {                 if (regFuture.isSuccess()) {                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);                 } else {                     promise.setFailure(regFuture.cause());                 }             }         });     }     ... }

(2)AbstractUnsafe.bind()方法的主要工作

AbstractBootstrap的doBind0()方法会执行代码channel.bind(),这个channel其实就是通过channelFactory工厂反射生成的NioServerSocketChannel。

 

所以执行channel.bind()其实就是执行AbstractChannel的bind()方法。经过逐层调用,最后会落到调用AbstractChannel内部类AbstractUnsafe的bind()方法。

 

AbstractUnsafe的bind()方法主要做两件事:

一.调用JDK底层绑定端口

二.传播channelActive事件并注册ACCEPT事件

//A ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections. public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {     ... }  //AbstractNioChannel base class for Channels that operate on messages. public abstract class AbstractNioMessageChannel extends AbstractNioChannel {     ... }  //Abstract base class for Channel implementations which use a Selector based approach. public abstract class AbstractNioChannel extends AbstractChannel {     ... }  //A skeletal {@link Channel} implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {     ...     private final DefaultChannelPipeline pipeline;     @Override     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {         return pipeline.bind(localAddress, promise);     }     ... }  //The default ChannelPipeline implementation.  //It is usually created by a Channel implementation when the Channel is created. public class DefaultChannelPipeline implements ChannelPipeline {     final AbstractChannelHandlerContext head;     final AbstractChannelHandlerContext tail;     ...     @Override     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {         return tail.bind(localAddress, promise);     }     ... }  abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {     ...     @Override     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {         if (localAddress == null) throw new NullPointerException("localAddress");         if (!validatePromise(promise, false)) return promise;         final AbstractChannelHandlerContext next = findContextOutbound();         EventExecutor executor = next.executor();         if (executor.inEventLoop()) {             next.invokeBind(localAddress, promise);         } else {             safeExecute(executor, new Runnable() {                 @Override                 public void run() {                     next.invokeBind(localAddress, promise);                 }             }, promise, null);         }         return promise;     }          private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {         if (invokeHandler()) {             try {                 //执行DefaultChannelPipeline.HeadContext的bind()方法                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);             } catch (Throwable t) {                 notifyOutboundHandlerException(t, promise);             }         } else {             bind(localAddress, promise);         }     }     ... }  //The default ChannelPipeline implementation.  //It is usually created by a Channel implementation when the Channel is created. public class DefaultChannelPipeline implements ChannelPipeline {     ...     final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {         private final Unsafe unsafe;         HeadContext(DefaultChannelPipeline pipeline) {             super(pipeline, null, HEAD_NAME, false, true);             unsafe = pipeline.channel().unsafe();             setAddComplete();         }         ...         @Override         public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {             //执行AbstractChannel内部类AbstractUnsafe的bind()方法             unsafe.bind(localAddress, promise);         }         ...     }     ... }  //A skeletal {@link Channel} implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {     private final DefaultChannelPipeline pipeline;     ...     //Unsafe implementation which sub-classes must extend and use.     protected abstract class AbstractUnsafe implements Unsafe {         ...         @Override         public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {             ...             boolean wasActive = isActive();             try {                 //1.调用JDK底层绑定端口                 doBind(localAddress);             } catch (Throwable t) {                 safeSetFailure(promise, t);                 closeIfClosed();                 return;             }              if (!wasActive && isActive()) {                 invokeLater(new Runnable() {                     @Override                     public void run() {                         //2.传播channelActive事件并注册ACCEPT事件                         pipeline.fireChannelActive();                     }                 });             }             safeSetSuccess(promise);         }         ...     }     ...     //Bind the Channel to the SocketAddress     protected abstract void doBind(SocketAddress localAddress) throws Exception;     ... }

(3)调用JDK底层绑定端口

AbstractUnsafe的bind()方法中所调用的doBind()方法是属于AbstractChannel的抽象接口,会由NioServerSocketChannel来进行具体的实现,即调用JDK底层NIO的bind()方法来绑定端口。

//A skeletal {@link Channel} implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {     ...     //Bind the Channel to the SocketAddress     protected abstract void doBind(SocketAddress localAddress) throws Exception;     ... }  //A ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections. public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {     ...     @Override     protected void doBind(SocketAddress localAddress) throws Exception {         if (PlatformDependent.javaVersion() >= 7) {             javaChannel().bind(localAddress, config.getBacklog());         } else {             javaChannel().socket().bind(localAddress, config.getBacklog());         }     }          @Override     protected ServerSocketChannel javaChannel() {         return (ServerSocketChannel) super.javaChannel();     }     ... }  //Abstract base class for Channel implementations which use a Selector based approach. public abstract class AbstractNioChannel extends AbstractChannel {     private final SelectableChannel ch;//这是NIO中的Channel     ...     protected SelectableChannel javaChannel() {         return ch;     }     ... }

(4)传播ChannelActive事件

绑定完端口后,就会执行代码pipeline.fireChannelActive(),也就是调用DefaultChannelPipeline.fireChannelActive()。

 

最后会调用DefaultChannelPipeline.HeadContext的channelActive()方法传播channelActive事件。

//The default ChannelPipeline implementation.   //It is usually created by a Channel implementation when the Channel is created. public class DefaultChannelPipeline implements ChannelPipeline {     final AbstractChannelHandlerContext head;     final AbstractChannelHandlerContext tail;     ...     @Override     public final ChannelPipeline fireChannelActive() {         AbstractChannelHandlerContext.invokeChannelActive(head);         return this;     }     ... }  abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {     ...     static void invokeChannelActive(final AbstractChannelHandlerContext next) {         EventExecutor executor = next.executor();         if (executor.inEventLoop()) {             next.invokeChannelActive();         } else {             executor.execute(new Runnable() {                 @Override                 public void run() {                     next.invokeChannelActive();                 }             });         }     }          private void invokeChannelActive() {         if (invokeHandler()) {             try {                 //执行DefaultChannelPipeline.HeadContext的channelActive()方法                 ((ChannelInboundHandler) handler()).channelActive(this);             } catch (Throwable t) {                 notifyHandlerException(t);             }         } else {             fireChannelActive();         }     } }  //The default ChannelPipeline implementation.  //It is usually created by a Channel implementation when the Channel is created. public class DefaultChannelPipeline implements ChannelPipeline {     ...     final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {         ...         @Override         public void channelActive(ChannelHandlerContext ctx) throws Exception {             //1.传播channelActive事件             ctx.fireChannelActive();             //2.注册ACCEPT事件             readIfIsAutoRead();         }         ...     } }

(5)注册ACCEPT事件到Selector

传播完channelActive事件后,便会调用HeadContext.readIfIsAutoRead()方法。然后逐层调用到AbstractChannel内部类AbstractUnsafe的beginRead()方法,并最终调用到AbstractNioChannel的doBeginRead()方法来注册ACCEPT事件。

//The default ChannelPipeline implementation.  //It is usually created by a Channel implementation when the Channel is created. public class DefaultChannelPipeline implements ChannelPipeline {     private final Channel channel;     ...     final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {         ...         private void readIfIsAutoRead() {             //isAutoRead()方法默认会返回true             if (channel.config().isAutoRead()) {                 //调用AbstractChannel的read()方法                 channel.read();             }         }         ...     } }  //A skeletal {@link Channel} implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {     private final DefaultChannelPipeline pipeline;     ...     @Override     public Channel read() {         pipeline.read();         return this;     }     ... }  //The default ChannelPipeline implementation.  //It is usually created by a Channel implementation when the Channel is created. public class DefaultChannelPipeline implements ChannelPipeline {     final AbstractChannelHandlerContext head;     final AbstractChannelHandlerContext tail;     ...     @Override     public final ChannelPipeline read() {         tail.read();         return this;     }     ... }  abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {     ...     @Override     public ChannelHandlerContext read() {         final AbstractChannelHandlerContext next = findContextOutbound();         EventExecutor executor = next.executor();         if (executor.inEventLoop()) {             next.invokeRead();         } else {             Runnable task = next.invokeReadTask;             if (task == null) {                 next.invokeReadTask = task = new Runnable() {                     @Override                     public void run() {                         next.invokeRead();                     }                 };             }             executor.execute(task);         }         return this;     }      private void invokeRead() {         if (invokeHandler()) {             try {                 //执行DefaultChannelPipeline.HeadContext的read()方法                 ((ChannelOutboundHandler) handler()).read(this);             } catch (Throwable t) {                 notifyHandlerException(t);             }         } else {             read();         }     }     ... }  //The default ChannelPipeline implementation.  //It is usually created by a Channel implementation when the Channel is created. public class DefaultChannelPipeline implements ChannelPipeline {     ...     final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {         private final Unsafe unsafe;         HeadContext(DefaultChannelPipeline pipeline) {             super(pipeline, null, HEAD_NAME, false, true);             unsafe = pipeline.channel().unsafe();             setAddComplete();         }         ...         @Override         public void read(ChannelHandlerContext ctx) {             unsafe.beginRead();         }         ...     } }  //A skeletal {@link Channel} implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {     private final DefaultChannelPipeline pipeline;     ...     //Unsafe implementation which sub-classes must extend and use.     protected abstract class AbstractUnsafe implements Unsafe {         ...         @Override         public final void beginRead() {             assertEventLoop();             if (!isActive()) return;             try {                 doBeginRead();             } catch (final Exception e) {                 invokeLater(new Runnable() {                     @Override                     public void run() {                         pipeline.fireExceptionCaught(e);                     }                 });                 close(voidPromise());             }         }         ...     }          //Schedule a read operation.     protected abstract void doBeginRead() throws Exception;     ... }  //Abstract base class for Channel implementations which use a Selector based approach. public abstract class AbstractNioChannel extends AbstractChannel {     protected final int readInterestOp;     volatile SelectionKey selectionKey;     boolean readPending;     ...     @Override     protected void doBeginRead() throws Exception {         //Channel.read() or ChannelHandlerContext.read() was called         //this.selectionKey就是前面注册服务端Channel时返回的对象         //注册服务端Channel时,注册ops的值是0,表示还不关注任何事件         final SelectionKey selectionKey = this.selectionKey;         if (!selectionKey.isValid()) return;          readPending = true;         final int interestOps = selectionKey.interestOps();         //这里的readInterestOp就是前面newChannel()时传入的SelectionKey.OP_ACCEPT         //所以这样要做的工作就是,告诉JDK的Selector一切工作准备就绪,只剩下把ACCEPT事件注册到Selector上         if ((interestOps & readInterestOp) == 0) {             //关注ACCEPT事件             selectionKey.interestOps(interestOps | readInterestOp);         }     }     ... }

(6)绑定服务端端口总结

绑定服务端端口,最终会调用JDK底层API去进行实际绑定。绑定端口成功后,会由DefaultChannelPipeline传播channelActive事件,以及把ACCEPT事件注册到Selector上,从而可以通过Selector监听新连接的接入。

ServerBootstrap.bind() //用户代码入口   AbstractBootstrap.initAndRegister() //初始化并注册Channel     channelFactory.newChannel() //创建服务端Channel     ServerBootstrap.init() //初始化服务端Channel     NioEventLoopGroup.register() //注册服务端Channel   AbstractBootstrap.doBind0() //绑定服务端端口     AbstractChannel.AbstractUnsafe.bind() //绑定服务端端口入口       NioServerSocketChannel.doBind() //NioServerSocketChannel实现         javaChannel().bind() //JDK底层API绑定端口       DefaultChannelPipeline.fireChannelActive() //传播channelActive事件         HeadContext.readIfIsAutoRead() //注册ACCEPT事件到Selector上

 

7.服务端启动流程源码总结

initAndRegister()里的newChannel()会通过反射创建JDK底层Channel,同时会创建该Channel对应的Config对象并设置该Channel为非阻塞模式。总之,创建服务端Channel时会完成Netty几大基本组件的创建。如Channel、ChannelConfig、ChannelId、Unsafe、ChannelPipeline。

 

初始化服务端Channel时,会设置服务端Channel和客户端Channel的Option和Attr,并且给服务端Channel添加连接接入器ServerBootstrapAcceptor用于接收新连接。

 

注册服务端Channel时,会调用JDK底层的API将Channel注册到Selector,同时将Netty领域的Channel当作attachment注册到Selector上,并且回调handlerAdded事件和传播channelRegistered事件到其他用户代码中。

 

绑定服务端端口时,会调用JDK底层API进行端口绑定并传播channelActive事件。当channelActive事件被传播后,才真正进行有效的服务端端口绑定,也就是把ACCEPT事件注册到Selector上。

 

发表评论

评论已关闭。

相关文章

  • 0