//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop. public final class NioEventLoop extends SingleThreadEventLoop { Selector selector; private SelectedSelectionKeySet selectedKeys; private boolean needsToSelectAgain; private int cancelledKeys; ... @Override protected void run() { for (;;) { ... //1.调用select()方法执行一次事件轮询 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } ... //2.处理产生IO事件的Channel needsToSelectAgain = false; processSelectedKeys(); ... //3.执行外部线程放入TaskQueue的任务 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } private void processSelectedKeys() { if (selectedKeys != null) { //selectedKeys.flip()会返回一个数组 processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { //1.首先取出IO事件 final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null;//Help GC //2.然后获取对应的Channel和处理该Channel //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { //网络事件的处理 processSelectedKey(k, (AbstractNioChannel) a); } else { //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } //3.最后判断是否应该再进行一次轮询 if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); //selectedKeys.flip()会返回一个数组 selectedKeys = this.selectedKeys.flip(); i = -1; } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { //If the channel implementation throws an exception because there is no event loop, //we ignore this because we are only trying to determine if ch is registered to this event loop and thus has authority to close ch. return; } //Only close ch if ch is still registerd to this EventLoop. //ch could have deregistered from the event loop and thus the SelectionKey could be cancelled as part of the deregistration process, //but the channel is still healthy and should not be closed. if (eventLoop != this || eventLoop == null) { return; } //close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); //We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise //the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { //remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { //Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } //Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop //boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入 //此时将调用Channel的unsafe变量来进行实际操作 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //进行新连接接入处理 unsafe.read(); if (!ch.isOpen()) { //Connection already closed - no need to handle write. return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } ... }
//AbstractNioChannel base class for Channels that operate on messages. public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... private final class NioMessageUnsafe extends AbstractNioUnsafe { //临时存放读到的连接NioSocketChannel private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { //断言确保该read()方法必须来自Reactor线程调用 assert eventLoop().inEventLoop(); //获得Channel对应的Pipeline final ChannelPipeline pipeline = pipeline(); //获得Channel对应的RecvByteBufAllocator.Handle final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); do { //1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel //通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } } while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接 //2.设置并绑定NioSocketChannel int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } //3.清理容器并触发pipeline.fireChannelReadComplete() readBuf.clear(); pipeline.fireChannelReadComplete(); } } //Read messages into the given array and return the amount which was read. protected abstract int doReadMessages(List<Object> buf) throws Exception; ... }
//A ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections. public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); private final ServerSocketChannelConfig config; ... @Override protected int doReadMessages(List<Object> buf) throws Exception { //1.创建JDK的Channel SocketChannel ch = javaChannel().accept(); //2.封装成Netty的Channel,即把服务端Channel和客户端Channel当作参数传递到NioSocketChannel的构造方法里 if (ch != null) { //先创建一个NioSocketChannel对象,再添加到buf里 buf.add(new NioSocketChannel(this, ch)); return 1; } return 0; } //Create a new instance public NioServerSocketChannel() { //创建服务端Channel this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } private static ServerSocketChannel newSocket(SelectorProvider provider) { //创建服务端Channel return provider.openServerSocketChannel(); } //Create a new instance using the given ServerSocketChannel. public NioServerSocketChannel(ServerSocketChannel channel) { //创建服务端Channel,关注ACCEPT事件 super(null, channel, SelectionKey.OP_ACCEPT); //javaChannel().socket()会调用JDK Channel的socket()方法 config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } @Override protected ServerSocketChannel javaChannel() { //返回一个JDK的Channel -> ServerSocketChannel return (ServerSocketChannel) super.javaChannel(); } ... } //AbstractNioChannel base class for Channels that operate on messages. public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... //创建服务端Channel protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); } @Override protected AbstractNioUnsafe newUnsafe() { return new NioMessageUnsafe(); } ... } //SocketChannel which uses NIO selector based implementation. public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { private final SocketChannelConfig config; ... //Create a new instance //@param parent,the Channel which created this instance or null if it was created by the user //@param socket,the SocketChannel which will be used public NioSocketChannel(Channel parent, SocketChannel socket) { //创建客户端Channel super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } @Override protected SocketChannel javaChannel() { //返回一个JDK的Channel -> ServerSocketChannel return (SocketChannel) super.javaChannel(); } private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket); } ... } ... } //The default SocketChannelConfig implementation. public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { protected final Socket javaSocket; //Creates a new instance. public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) { ... this.javaSocket = javaSocket; setTcpNoDelay(true);//禁止Nagle算法 ... } ... } //AbstractNioChannel base class for Channels that operate on bytes. public abstract class AbstractNioByteChannel extends AbstractNioChannel { ... //Create a new instance //@param parent,the parent Channel by which this instance was created. May be null //@param ch,the underlying SelectableChannel on which it operates protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { //创建客户端Channel,关注READ事件 super(parent, ch, SelectionKey.OP_READ); } @Override protected AbstractNioUnsafe newUnsafe() { return new NioByteUnsafe(); } ... } //Abstract base class for Channel implementations which use a Selector based approach. public abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch; protected final int readInterestOp; ... //Create a new instance //@param parent,the parent Channel by which this instance was created. May be null //@param ch,the underlying SelectableChannel on which it operates //@param readInterestOp,the ops to set to receive data from the SelectableChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; ch.configureBlocking(false); ... } protected SelectableChannel javaChannel() { return ch; } @Override public NioUnsafe unsafe() { return (NioUnsafe) super.unsafe(); } ... } //A skeletal Channel implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final Channel parent; private final ChannelId id; private final Unsafe unsafe; private final DefaultChannelPipeline pipeline; ... //Creates a new instance. //@param parent,the parent of this channel. null if there's no parent. protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } //Returns a new DefaultChannelId instance. //Subclasses may override this method to assign custom ChannelIds to Channels that use the AbstractChannel#AbstractChannel(Channel) constructor. protected ChannelId newId() { return DefaultChannelId.newInstance(); } //Create a new AbstractUnsafe instance which will be used for the life-time of the Channel protected abstract AbstractUnsafe newUnsafe(); //Returns a new DefaultChannelPipeline instance. protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); } @Override public Unsafe unsafe() { return unsafe; } @Override public ChannelPipeline pipeline() { return pipeline; } @Override public EventLoop eventLoop() { EventLoop eventLoop = this.eventLoop; if (eventLoop == null) throw new IllegalStateException("channel not registered to an event loop"); return eventLoop; } protected abstract class AbstractUnsafe implements Unsafe { @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... //绑定事件循环器,即绑定一个NioEventLoop到该Channel上 AbstractChannel.this.eventLoop = eventLoop; //注册Selector,并启动一个NioEventLoop if (eventLoop.inEventLoop()) { register0(promise); } else { ... //通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上 //其实执行的是SingleThreadEventExecutor的execute()方法 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); ... } } ... } ... }
new NioSocketChannel(p, ch) //入口,客户端Channel是通过new关键字创建的,服务端Channel是通过反射的方式创建的 new AbstractNioByteChannel(p, ch) //逐层调用父类的构造方法 new AbstractNioChannel(p, ch, op_read) //逐层调用父类的构造方法 ch.configureBlocking(false) + save op //配置此Channel为非阻塞,以及将感兴趣的读事件保存到成员变量以方便后续注册到Selector new AbstractChannel() //创建Channel的相关组件: newId() //id作为Channel的唯一标识 newUnsafe() //unsafe用来进行底层数据读写 newChannelPipeline() //pipeline作为业务逻辑载体 new NioSocketChannelConfig() //创建和NioSocketChannel绑定的配置类 setTcpNoDelay(true) //禁止Nagle算法
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop. public final class NioEventLoop extends SingleThreadEventLoop { Selector selector; private SelectedSelectionKeySet selectedKeys; private boolean needsToSelectAgain; private int cancelledKeys; ... @Override protected void run() { for (;;) { ... //1.调用select()方法执行一次事件轮询 select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } ... //2.处理产生IO事件的Channel needsToSelectAgain = false; processSelectedKeys(); ... //3.执行外部线程放入TaskQueue的任务 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } private void processSelectedKeys() { if (selectedKeys != null) { //selectedKeys.flip()会返回一个数组 processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } } private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { //1.首先取出IO事件 final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null;//Help GC //2.然后获取对应的Channel和处理该Channel //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { //网络事件的处理 processSelectedKey(k, (AbstractNioChannel) a); } else { //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } //3.最后判断是否应该再进行一次轮询 if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); //selectedKeys.flip()会返回一个数组 selectedKeys = this.selectedKeys.flip(); i = -1; } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); ... try { int readyOps = k.readyOps(); //We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise //the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { //remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } //Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { //Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } //Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop //boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入 //此时将调用Channel的unsafe变量来进行实际操作 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { //进行新连接接入处理 unsafe.read(); if (!ch.isOpen()) { //Connection already closed - no need to handle write. return; } } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } } ... } //AbstractNioChannel base class for Channels that operate on messages. public abstract class AbstractNioMessageChannel extends AbstractNioChannel { ... private final class NioMessageUnsafe extends AbstractNioUnsafe { //临时存放读到的连接NioSocketChannel private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() { //断言确保该read()方法必须来自Reactor线程调用 assert eventLoop().inEventLoop(); //获得Channel对应的Pipeline final ChannelPipeline pipeline = pipeline(); //获得Channel对应的RecvByteBufAllocator.Handle final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); do { //1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel //通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channel int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } } while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接 //2.设置并绑定NioSocketChannel int size = readBuf.size(); for (int i = 0; i < size; i ++) { //调用DefaultChannelPipeline的fireChannelRead()方法 //开始处理每个NioSocketChannel连接 pipeline.fireChannelRead(readBuf.get(i)); } //3.清理容器并触发DefaultChannelPipeline的fireChannelReadComplete()方法 readBuf.clear(); //结束处理每个NioSocketChannel连接 pipeline.fireChannelReadComplete(); } } //Read messages into the given array and return the amount which was read. protected abstract int doReadMessages(List<Object> buf) throws Exception; ... }
//A skeletal Channel implementation. public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private final ChannelId id; private final Unsafe unsafe; private final DefaultChannelPipeline pipeline; private volatile EventLoop eventLoop; ... //Creates a new instance. protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } //Returns a new DefaultChannelPipeline instance. protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); } //Unsafe implementation which sub-classes must extend and use. protected abstract class AbstractUnsafe implements Unsafe { ... @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... //绑定事件循环器,即绑定一个NioEventLoop到该Channel上 AbstractChannel.this.eventLoop = eventLoop; //注册Selector,并启动一个NioEventLoop if (eventLoop.inEventLoop()) { register0(promise); } else { ... //通过启动这个NioEventLoop线程来调用register0()方法将这个Channel注册到Selector上 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); ... } } private void register0(ChannelPromise promise) { ... boolean firstRegistration = this.neverRegistered; //1.调用JDK底层注册Channel到Selector上 doRegister(); this.neverRegistered = false; this.registered = true; //2.配置自定义Handler this.pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); //3.传播channelRegisterd事件 this.pipeline.fireChannelRegistered(); //4.注册读事件 if (isActive()) { if (firstRegistration) { //会进入这个方法,传播完ChannelActive事件后,再注册读事件 this.pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } ... } @Override public final void beginRead() { ... //调用AbstractNioChannel实现的doBeginRead()方法 doBeginRead(); ... } ... } //Is called after the Channel is registered with its EventLoop as part of the register process. //Sub-classes may override this method protected void doRegister() throws Exception { // NOOP } //Schedule a read operation. protected abstract void doBeginRead() throws Exception; @Override public Channel read() { //调用DefaultChannelPipeline的read()方法 pipeline.read(); return this; } ... } //Abstract base class for Channel implementations which use a Selector based approach. public abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch;//这是NIO中的Channel protected final int readInterestOp; volatile SelectionKey selectionKey; ... //Create a new instance //@param parent,the parent Channel by which this instance was created. May be null. //@param ch,he underlying SelectableChannel on which it operates //@param readInterestOp,the ops to set to receive data from the SelectableChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); //NioServerSocketChannel.newSocket()方法通过JDK底层创建的Channel对象会被缓存在其父类AbstractNioChannel的变量ch中 //可以通过NioServerSocketChannel.javaChannel()方法获取其父类AbstractNioChannel的变量ch this.ch = ch; this.readInterestOp = readInterestOp; ... //设置Channel对象为非阻塞模式 ch.configureBlocking(false); ... } @Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { ... //首先获取前面创建的JDK底层NIO的Channel,然后调用JDK底层NIO的register()方法, //将this也就是NioServerSocketChannel对象当作attachment绑定到JDK的Selector上; //这样绑定是为了后续从Selector拿到对应的事件后,可以把Netty领域的Channel拿出来; //而且注册的ops值是0,表示此时还不关注任何事件; selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; ... } } protected SelectableChannel javaChannel() { return ch; } @Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { //将SelectionKey.OP_READ读事件注册到Selector上,表示这个客户端Channel可以处理读事件了 selectionKey.interestOps(interestOps | readInterestOp); } } ... } //The default ChannelPipeline implementation. //It is usually created by a Channel implementation when the Channel is created. public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ... @Override public final ChannelPipeline fireChannelActive() { //调用HeadContext的channelActive()方法 AbstractChannelHandlerContext.invokeChannelActive(head); return this; } @Override public final ChannelPipeline read() { //从TailContext开始,最终会调用到HeadContext的read()方法 tail.read(); return this; } final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; ... @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive();//传播ChannelActive事件 readIfIsAutoRead(); } private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { //调用AbstractChannel的read()方法 channel.read(); } } @Override public void read(ChannelHandlerContext ctx) { //调用AbstractChannel.AbstractUnsafe的beginRead()方法 unsafe.beginRead(); } ... } }