//Bootstrap sub-class which allows easy bootstrap of ServerChannel public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { ... ... } //AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. //It support method-chaining to provide an easy way to configure the AbstractBootstrap. //When not used in a ServerBootstrap context, the #bind() methods are useful for connectionless transports such as datagram (UDP). public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { ... //Create a new Channel and bind it. public ChannelFuture bind(int inetPort) { //首先根据端口号创建一个InetSocketAddress对象,然后调用重载方法bind() return bind(new InetSocketAddress(inetPort)); } //Create a new Channel and bind it. public ChannelFuture bind(SocketAddress localAddress) { //验证服务启动需要的必要参数 validate(); if (localAddress == null) throw new NullPointerException("localAddress"); return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); } private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//1.初始化和注册Channel final Channel channel = regFuture.channel(); ... doBind0(regFuture, channel, localAddress, promise);//2.绑定服务端端口 ... return promise; } ... }
//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. //It support method-chaining to provide an easy way to configure the AbstractBootstrap. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { ... final ChannelFuture initAndRegister() { Channel channel = null; ... //1.创建服务端Channel channel = channelFactory.newChannel(); //2.初始化服务端Channel init(channel); ... //3.注册服务端Channel,比如通过NioEventLoopGroup的register()方法进行注册 ChannelFuture regFuture = config().group().register(channel); ... return regFuture; } ... }
//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. //It support method-chaining to provide an easy way to configure the AbstractBootstrap. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { private volatile ChannelFactory<? extends C> channelFactory; ... final ChannelFuture initAndRegister() { Channel channel = null; ... //1.创建服务端Channel channel = channelFactory.newChannel(); //2.初始化服务端Channel init(channel); ... //3.注册服务端Channel,比如通过NioEventLoopGroup的register()方法进行注册 ChannelFuture regFuture = config().group().register(channel); ... return regFuture; } ... } public interface ChannelFactory<T extends Channel> { //Creates a new channel. T newChannel(); }
public class NettyServer { ... public void start() throws Exception { ... ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//监听端口的ServerSocketChannel .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() {//处理每个客户端连接的SocketChannel @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ... } }); ChannelFuture channelFuture = serverBootstrap.bind(port).sync();//同步等待启动服务器监控端口 channelFuture.channel().closeFuture().sync();//同步等待关闭启动服务器的结果 ... } } //AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. //It support method-chaining to provide an easy way to configure the AbstractBootstrap. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { private volatile ChannelFactory<? extends C> channelFactory; ... //The Class which is used to create Channel instances from. //You either use this or #channelFactory(io.netty.channel.ChannelFactory) if your Channel implementation has no no-args constructor. public B channel(Class<? extends C> channelClass) { if (channelClass == null) throw new NullPointerException("channelClass"); return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } @SuppressWarnings({ "unchecked", "deprecation" }) public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { return channelFactory((ChannelFactory<C>) channelFactory); } public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) throw new NullPointerException("channelFactory"); if (this.channelFactory != null) throw new IllegalStateException("channelFactory set already"); this.channelFactory = channelFactory; return (B) this; } ... }
//A ChannelFactory that instantiates a new Channel by invoking its default constructor reflectively. public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) throw new NullPointerException("clazz"); this.clazz = clazz; } @Override public T newChannel() { ... return clazz.newInstance(); } }
//A io.netty.channel.socket.ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections. public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private final ServerSocketChannelConfig config; ... //Create a new instance,默认的构造方法 public NioServerSocketChannel() { //调用newSocket()创建出一个ServerSocketChannel对象后,再调用有参构造方法 this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } private static ServerSocketChannel newSocket(SelectorProvider provider) { ... //创建一个ServerSocketChannel对象 //这个对象也就是JDK底层的Channel,即NIO的Socket return provider.openServerSocketChannel(); } //Create a new instance using the given ServerSocketChannel,重载的构造方法 public NioServerSocketChannel(ServerSocketChannel channel) { //传入要关心的ACCEPT事件 super(null, channel, SelectionKey.OP_ACCEPT); this.config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } ... }
//A io.netty.channel.socket.ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections. public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private final ServerSocketChannelConfig config; ... //Create a new instance using the given ServerSocketChannel,重载的构造方法 public NioServerSocketChannel(ServerSocketChannel channel) { //传入要关心的ACCEPT事件 super(null, channel, SelectionKey.OP_ACCEPT); this.config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } ... } //AbstractNioChannel} base class for Channels that operate on messages. public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } ... } //Abstract base class for Channel implementations which use a Selector based approach. public abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch;//这是NIO中的Channel protected final int readInterestOp; ... //Create a new instance //@param parent,the parent Channel by which this instance was created. May be null. //@param ch,he underlying SelectableChannel on which it operates //@param readInterestOp,the ops to set to receive data from the SelectableChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); //NioServerSocketChannel.newSocket()方法通过JDK底层创建的Channel对象会被缓存在其父类AbstractNioChannel的变量ch中 //可以通过NioServerSocketChannel.javaChannel()方法获取其父类AbstractNioChannel的变量ch this.ch = ch; this.readInterestOp = readInterestOp; ... //设置Channel对象为非阻塞模式 ch.configureBlocking(false); ... } protected SelectableChannel javaChannel() { return ch; } ... }
//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. //It support method-chaining to provide an easy way to configure the AbstractBootstrap. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { ... final ChannelFuture initAndRegister() { Channel channel = null; ... //1.创建服务端Channel channel = channelFactory.newChannel(); //2.初始化服务端Channel init(channel); ... //3.注册服务端Channel,比如通过NioEventLoopGroup的register()方法进行注册 ChannelFuture regFuture = config().group().register(channel); ... return regFuture; } //init()方法的具体逻辑会由ServerBootstrap来实现 abstract void init(Channel channel) throws Exception; ... }
//AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. //It support method-chaining to provide an easy way to configure the AbstractBootstrap. public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { volatile EventLoopGroup group; ... final ChannelFuture initAndRegister() { Channel channel = null; ... //1.创建服务端Channel channel = channelFactory.newChannel(); //2.初始化服务端Channel init(channel); ... //3.注册服务端Channel并启动一个NioEventLoop线程,通过NioEventLoopGroup的register()方法进行注册 ChannelFuture regFuture = config().group().register(channel); ... return regFuture; } //Returns the AbstractBootstrapConfig object that can be used to obtain the current config of the bootstrap. public abstract AbstractBootstrapConfig<B, C> config(); //Returns the configured EventLoopGroup or null if non is configured yet. public final EventLoopGroup group() { return group; } ... } //Bootstrap sub-class which allows easy bootstrap of ServerChannel public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); ... @Override public final ServerBootstrapConfig config() { return config; } ... } public abstract class AbstractBootstrapConfig<B extends AbstractBootstrap<B, C>, C extends Channel> { protected final B bootstrap; ... protected AbstractBootstrapConfig(B bootstrap) { this.bootstrap = ObjectUtil.checkNotNull(bootstrap, "bootstrap"); } //Returns the configured EventLoopGroup or null if non is configured yet. public final EventLoopGroup group() { //比如返回一个NioEventLoopGroup对象 return bootstrap.group(); } ... }
//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels. public class NioEventLoopGroup extends MultithreadEventLoopGroup { ... ... } //Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time. public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { ... @Override public ChannelFuture register(Channel channel) { //先通过next()方法获取一个NioEventLoop,然后通过NioEventLoop.register()方法注册服务端Channel return next().register(channel); } @Override public EventLoop next() { return (EventLoop) super.next(); } ... } //SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop. public final class NioEventLoop extends SingleThreadEventLoop { ... ... } //Abstract base class for EventLoops that execute all its submitted tasks in a single thread. public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { ... @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); //调用AbstractUnsafe的register()方法 promise.channel().unsafe().register(this, promise); return promise; } ... }
//A skeletal Channel implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private volatile EventLoop eventLoop; ... //Unsafe implementation which sub-classes must extend and use. protected abstract class AbstractUnsafe implements Unsafe { ... @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... //绑定事件循环器,即绑定一个NioEventLoop到该Channel上 AbstractChannel.this.eventLoop = eventLoop; //注册Selector,并启动一个NioEventLoop if (eventLoop.inEventLoop()) { register0(promise); } else { ... //通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); ... } } private void register0(ChannelPromise promise) { ... boolean firstRegistration = this.neverRegistered; //1.调用JDK底层注册服务端Channel到Selector上 doRegister(); this.neverRegistered = false; this.registered = true; //2.回调handlerAdded事件 this.pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //3.传播channelRegisterd事件到用户代码里 this.pipeline.fireChannelRegistered(); //4.其他逻辑 if (isActive()) { if (firstRegistration) { this.pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } ... } ... } //Is called after the Channel is registered with its EventLoop as part of the register process. //Sub-classes may override this method protected void doRegister() throws Exception { // NOOP } ... } //Abstract base class for Channel implementations which use a Selector based approach. public abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch;//这是NIO中的Channel protected final int readInterestOp; volatile SelectionKey selectionKey; ... //Create a new instance //@param parent,the parent Channel by which this instance was created. May be null. //@param ch,he underlying SelectableChannel on which it operates //@param readInterestOp,the ops to set to receive data from the SelectableChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); //NioServerSocketChannel.newSocket()方法通过JDK底层创建的Channel对象会被缓存在其父类AbstractNioChannel的变量ch中 //可以通过NioServerSocketChannel.javaChannel()方法获取其父类AbstractNioChannel的变量ch this.ch = ch; this.readInterestOp = readInterestOp; ... //设置Channel对象为非阻塞模式 ch.configureBlocking(false); ... } @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { ... //首先获取前面创建的JDK底层NIO的Channel,然后调用JDK底层NIO的register()方法, //将this也就是NioServerSocketChannel对象当作attachment绑定到JDK的Selector上; //这样绑定是为了后续从Selector拿到对应的事件后,可以把Netty领域的Channel拿出来; //而且注册的ops值是0,表示此时还不关注任何事件; selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; ... } } protected SelectableChannel javaChannel() { return ch; } ... }
//Bootstrap sub-class which allows easy bootstrap of ServerChannel public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { ... ... } //AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. //It support method-chaining to provide an easy way to configure the AbstractBootstrap. //When not used in a ServerBootstrap context, the #bind() methods are useful for connectionless transports such as datagram (UDP). public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { ... //Create a new Channel and bind it. public ChannelFuture bind(int inetPort) { //首先根据端口号创建一个InetSocketAddress对象,然后调用重载方法bind() return bind(new InetSocketAddress(inetPort)); } //Create a new Channel and bind it. public ChannelFuture bind(SocketAddress localAddress) { //验证服务启动需要的必要参数 validate(); if (localAddress == null) throw new NullPointerException("localAddress"); return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); } private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();//1.初始化和注册Channel final Channel channel = regFuture.channel(); ... doBind0(regFuture, channel, localAddress, promise);//2.绑定服务端端口 ... return promise; } private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { //This method is invoked before channelRegistered() is triggered. //Give user handlers a chance to set up the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } ... }