Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明

Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明

作者: Grey

原文地址:

博客园:Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明

CSDN:Netty 学习(七):NioEventLoop 对应线程的创建和启动源码说明

说明

在 Netty 服务端代码中,我们一般会创建了两个 NioEventLoopGroup:bossGroup 和 workerGroup

其中: bossGroup用于监听端口,接收新连接的线程组;workerGroup 用于处理每一个连接的数据读写的线程组。

bossGroup 创建第一个 NioEventLoop 线程

NioEventLoop 的启动入口在AbstractUnsafe

        @Override         public final void register(EventLoop eventLoop, final ChannelPromise promise) {             ......             AbstractChannel.this.eventLoop = eventLoop;              if (eventLoop.inEventLoop()) {                 register0(promise);             } else {                 try {                     eventLoop.execute(new Runnable() {                         @Override                         public void run() {                             register0(promise);                         }                     });                 } catch (Throwable t) {                     logger.warn(                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",                             AbstractChannel.this, t);                     closeForcibly();                     closeFuture.setClosed();                     safeSetFailure(promise, t);                 }             }         } 

其中inEventLoop()方法调用的是AbstractEventExecutor的实现

    @Override     public boolean inEventLoop() {         return inEventLoop(Thread.currentThread());     } 

而这个实现又调用了子类SingleThreadEventExecutor的如下方法

    @Override     public boolean inEventLoop(Thread thread) {         return thread == this.thread;     } 

在服务端刚启动的时候,Thread.currentThread()就是当前 main 方法对应的主线程,而this.thread还没有开始赋值,所以此时为null,

所以eventLoop.inEventLoop()在一开始调用的时候,返回的是 false,进入AbstractUnsafe的如下else逻辑中

        @Override         public final void register(EventLoop eventLoop, final ChannelPromise promise) {             ......             AbstractChannel.this.eventLoop = eventLoop;             // 首次执行的时候 eventLoop.inEventLoop() 返回 false,执行 else 逻辑             if (eventLoop.inEventLoop()) {                 ......             } else {                ......                     eventLoop.execute(new Runnable() {                         @Override                         public void run() {                             register0(promise);                         }                     });                ......             }         } 

其中executor方法对应的是SingleThreadEventExecutorexecute方法

    private void execute(Runnable task, boolean immediate) {         boolean inEventLoop = inEventLoop();         addTask(task);         if (!inEventLoop) {             startThread();             if (isShutdown()) {                 ......             }         }          if (!addTaskWakesUp && immediate) {            ......         }     } 

inEventLoop()经过上述分析,为false,所以执行startThread()方法

    private void startThread() {         if (state == ST_NOT_STARTED) {             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {                 boolean success = false;                 try {                     doStartThread();                     success = true;                 } finally {                     if (!success) {                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);                     }                 }             }         }     } 

这里主要的逻辑就是判断线程是否启动,如果没有启动,就调用doStartThread()启动。doStartThread()的逻辑是

private void doStartThread() {         assert thread == null;         executor.execute(new Runnable() {             @Override             public void run() {                 thread = Thread.currentThread();                 ...                 SingleThreadEventExecutor.this.run();                 ......             }         });     } 

通过一个成员变量thread来保存ThreadPerTaskExecutor创建出来的线程(即:FastThreadLocalThread),NioEventLoop 保存完线程的引用之后,随即调用 run 方法。

workGroup 对应的 NioEventLoop 创建线程和启动

workGroup 对应的 NioEventLoop 创建的线程主要做如下事情

  1. 执行一次事件轮询。首先轮询注册到 Reactor 线程对应的 Selector 上的所有 Channel 的 IO 事件。

  2. 处理产生 IO 事件的 Channel。如果有读写或者新连接接入事件,则处理:

  3. 处理任务队列。

以上三个步骤分别对应了下述三个方法

事件轮询

事件轮询调用了NioEventLoop的如下方法

    private int select(long deadlineNanos) throws IOException {         if (deadlineNanos == NONE) {             return selector.select();         }         // Timeout will only be 0 if deadline is within 5 microsecs         long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;         return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);     } 

处理 IO 事件的 Channel

调用的是NioEventLoop的如下方法

    private void processSelectedKeys() {         if (selectedKeys != null) {             // 处理优化过的 SelectedKeys             processSelectedKeysOptimized();         } else {             // 处理正常的 SelectedKeys             processSelectedKeysPlain(selector.selectedKeys());         }     } 

上述两个分支分别处理了不同类型的 key:重点关注优化过的 SelectedKeys,selectedKeys 在 NioEventLoop 中是一个SelectedSelectionKeySet对象,这个对象虽然叫Set,但是底层使用了数组

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {      SelectionKey[] keys;     int size;      SelectedSelectionKeySet() {         keys = new SelectionKey[1024];     }      @Override     public boolean add(SelectionKey o) {         if (o == null) {             return false;         }          keys[size++] = o;         if (size == keys.length) {             increaseCapacity();         }          return true;     }      ...... }  

add 方法的主要流程是:

  1. 将SelectionKey塞到该数组的尾部;

  2. 更新该数组的逻辑长度+1;

  3. 如果该数组的逻辑长度等于数组的物理长度,就将该数组扩容。

待程序运行一段时间后,等数组的长度足够长,每次在轮询到 NIO 事件的时候,Netty 只需要O(1)的时间复杂度就能将SelectionKey塞到set中去,而 JDK 底层使用的HashSet,put的时间复杂度最少是O(1),最差是O(n)。

进入processSelectedKeysOptimized方法

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {         for (int i = 0;; i ++) {             final SelectionKey k = selectedKeys[i];             if (k == null) {                 break;             }             // null out entry in the array to allow to have it GC'ed once the Channel close             // See https://github.com/netty/netty/issues/2363             selectedKeys[i] = null;              final Object a = k.attachment();              if (a instanceof AbstractNioChannel) {                 processSelectedKey(k, (AbstractNioChannel) a);             } else {                 @SuppressWarnings("unchecked")                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;                 processSelectedKey(k, task);             }              if (needsToSelectAgain) {                 // null out entries in the array to allow to have it GC'ed once the Channel close                 // See https://github.com/netty/netty/issues/2363                 for (;;) {                     i++;                     if (selectedKeys[i] == null) {                         break;                     }                     selectedKeys[i] = null;                 }                  selectAgain();                 // Need to flip the optimized selectedKeys to get the right reference to the array                 // and reset the index to -1 which will then set to 0 on the for loop                 // to start over again.                 //                 // See https://github.com/netty/netty/issues/1523                 selectedKeys = this.selectedKeys.flip();                 i = -1;             }         }     } 

主要是三个步骤:

第一步,取出 IO 事件及对应的 Channel。其中selectedKeys[i] = null;的目的是防止内存泄漏

第二步,处理 Channel

if (a instanceof AbstractNioChannel) {     processSelectedKey(k, (AbstractNioChannel) a); } else {     @SuppressWarnings("unchecked")     NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;     processSelectedKey(k, task); } 

Netty 的轮询注册机制其实是将 AbstractNioChannel 内部的 JDK 类 SelectableChannel 对象注册到 JDK 类 Selector 对象上,并且将 AbstractNioChannel 作为SelectableChannel 对象的一个 attachment 附属上,这样在 JDK 轮询出某条 SelectableChannel 有 IO 事件发生时,就可以直接取出 AbstractNioChannel 进行后续操作。

在Netty的Channel中,有两大类型的Channel,

一个是NioServerSocketChannel,由boss NioEventLoopGroup负责处理;

一个是NioSocketChannel,由worker NioEventLoop负责处理,

所以:

(1)对于boss NioEventLoop来说,轮询到的是连接事件,后续通过NioServerSocketChannel的Pipeline将连接交给一个worker NioEventLoop处理;

(2)对于worker NioEventLoop来说,轮询到的是读写事件,后续通过NioSocketChannel的Pipeline将读取到的数据传递给每个ChannelHandler来处理。

第三步,判断是否需要再一次轮询

needsToSelectAgain变量控制,needsToSelectAgain变量在如下方法中被调用,在NioEventLoop

private static final int CLEANUP_INTERVAL = 256;     void cancel(SelectionKey key) {         key.cancel();         cancelledKeys ++;         if (cancelledKeys >= CLEANUP_INTERVAL) {             cancelledKeys = 0;             needsToSelectAgain = true;         }     } 

cancel方法是用于将key取消,并且在被取消的key到达CLEANUP_INTERVAL的时候,设置needsToSelectAgain为 true,CLEANUP_INTERVAL默认值为256。

也就是说,对于每个NioEventLoop而言,每隔256个Channel从Selector上移除的时候,就标记needsToSelectAgain为true,然后将SelectedKeys的内部数组全部清空,方便JVM垃圾回收,然后调用selectAgain重新填装SelectionKeys数组。

处理任务队列

调用的是如下方法

    protected boolean runAllTasks() {         assert inEventLoop();         boolean fetchedAll;         boolean ranAtLeastOne = false;          do {             fetchedAll = fetchFromScheduledTaskQueue();             if (runAllTasksFrom(taskQueue)) {                 ranAtLeastOne = true;             }         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.          if (ranAtLeastOne) {             lastExecutionTime = getCurrentTimeNanos();         }         afterRunningAllTasks();         return ranAtLeastOne;     } 

一路追踪下去,进入SingleThreadEventExecutorofferTask()方法

    final boolean offerTask(Runnable task) {         if (isShutdown()) {             reject();         }         return taskQueue.offer(task);     }  

Netty 内部使用一个 taskQueue 将Task保存起来。这个 taskQueue 其实是一个 MPSC Queue,每一个 NioEventLoop 都与它一一对应。

接下来执行SingleThreadEventExecutorrunAllTasks()方法

   protected boolean runAllTasks(long timeoutNanos) {         fetchFromScheduledTaskQueue();         Runnable task = pollTask();         if (task == null) {             afterRunningAllTasks();             return false;         }          final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;         long runTasks = 0;         long lastExecutionTime;         for (;;) {             safeExecute(task);              runTasks ++;              // Check timeout every 64 tasks because nanoTime() is relatively expensive.             // XXX: Hard-coded value - will make it configurable if it is really a problem.             if ((runTasks & 0x3F) == 0) {                 lastExecutionTime = getCurrentTimeNanos();                 if (lastExecutionTime >= deadline) {                     break;                 }             }              task = pollTask();             if (task == null) {                 lastExecutionTime = getCurrentTimeNanos();                 break;             }         }          afterRunningAllTasks();         this.lastExecutionTime = lastExecutionTime;         return true;     } 

主要流程如下:

1.NioEventLoop在执行过程中不断检测是否有事件发生,如果有事件发生就处理,处理完事件之后再处理外部线程提交过来的异步任务。

2.在检测是否有事件发生的时候,为了保证异步任务的及时处理,只要有任务要处理,就立即停止事件检测,随即处理任务。

3.外部线程异步执行的任务分为两种:定时任务和普通任务,分别落地到 MpscQueue 和 PriorityQueue ,而 PriorityQueue 中的任务最终都会填充到 MpscQueue 中处理。

4.Netty每隔64个任务检查一次是否该退出任务循环。

完整代码见:hello-netty

本文所有图例见:processon: Netty学习笔记

更多内容见:Netty专栏

参考资料

跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

深度解析Netty源码

发表评论

相关文章