Netty 学习(六):创建 NioEventLoopGroup 的核心源码说明
作者: Grey
原文地址:
博客园:Netty 学习(六):创建 NioEventLoopGroup 的核心源码说明
CSDN:Netty 学习(六):创建 NioEventLoopGroup 的核心源码说明
基于 JDK 的 API 自己实现 NIO 编程,需要一个线程池来不断监听端口。接收到新连接之后,这条连接上数据的读写会在另外一个线程池中进行。
在 Netty 实现的服务端中, 有如下经典代码
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); // 设置服务端的线程模型。 // bossGroup 负责不断接收新的连接,将新的连接交给 workerGroup 来处理。 b.group(bossGroup, workerGroup)
其中 bossGroup 对应的就是监听端口的线程池,在绑定一个端口的情况下,这个线程池里只有一个线程;workerGroup 对应的是连接的数据读写的线程。
通过 debug 并设置断点的方式,我们来查看下创建 NioEventLoopGroup 的核心过程,
在没有指定线程数的情况下new NioEventLoopGroup()
会调用如下构造方法
public NioEventLoopGroup() { this(0); }
即传入 0,然后一路跟下去,发现调用了MultithreadEventLoopGroup
的如下逻辑
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }
由于我们传入的nThreads == 0
,所以获取DEFAULT_EVENT_LOOP_THREADS
的值,在MultithreadEventLoopGroup
中,DEFAULT_EVENT_LOOP_THREADS
的初始化逻辑如下
private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } }
在nThreads == 0
的情况下,那么 NioEventLoopGroup 的默认线程的个数为 CPU 的核数乘以 2,即:NettyRuntime.availableProcessors() * 2
。
继续跟下去,可以看到 NioEventLoopGroup 调用了如下的构造方法,其核心代码如下
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { …… // 创建ThreadPerTaskExecutor:ThreadPerTaskExecutor表示每次调用execute()方法的时候,都会创建一个线程。 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } …… // 2.创建NioEventLoop:NioEventLoop对应线程池里线程的概念,这里其实就是用一个for循环创建的。 children = new EventExecutor[nThreads]; …… for (int i = 0; i < nThreads; i ++) { …… children[i] = newChild(executor, args); …… } // 3.创建线程选择器:线程选择器的作用是确定每次如何从线程池中选择一个线程,也就是每次如何从NioEventLoopGroup中选择一个NioEventLoop。 chooser = chooserFactory.newChooser(children); …… }
这个构造方法包括了三个内容
-
创建 ThreadPerTaskExecutor:ThreadPerTaskExecutor 主要是用来创建线程。
-
创建 NioEventLoop:NioEventLoop 对应线程池里线程的概念。
-
创建线程选择器:线程选择器的作用是确定每次如何从线程池中选择一个线程,也就是每次如何从 NioEventLoopGroup 中选择一个 NioEventLoop。
首先,我们看 ThreadPerTaskExecutor 如何创建线程,核心代码如下
public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory"); } @Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }
这里的 threadFactory 就是前面传入的newDefaultThreadFactory()
,这个方法定义了默认线程的一些基本信息,一路追踪到DefaultThreadFactory
中
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) { ObjectUtil.checkNotNull(poolName, "poolName"); if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)"); } prefix = poolName + '-' + poolId.incrementAndGet() + '-'; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; } // 创建线程,将 JDK 的 Runnable 包装成 FastThreadLocalRunnable @Override public Thread newThread(Runnable r) { Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); } if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; }
可以看到 Netty 的线程实体是由 ThreadPerTaskExecutor 创建的,ThreadPerTaskExecutor 每次执行 execute 的时候都会创建一个 FastThreadLocalThread 的线程实体。
接下来是创建 NioEventLoop,Netty 使用 for 循环来创建 nThreads 个 NioEventLoop,通过前面的分析,我们可能已经猜到,一个NioEventLoop对应一个线程实体,即 Netty 自己封装的 FastThreadLocalThread。
来到 NioEventLoop 的构造方法
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); ...... final SelectorTuple selectorTuple = openSelector(); ...... }
即创建了一个 Selector,Selector 是 NIO 编程里最核心的概念,一个 Selector 可以将多个连接绑定在一起,负责监听这些连接的读写事件,即多路复用。
继续往上调用构造方法
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, Queue<Runnable> taskQueue, RejectedExecutionHandler rejectedHandler) { ...... this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); ...... }
NioEventLoop 重写了 taskQueue 的创建逻辑
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); } private static Queue<Runnable> newTaskQueue( EventLoopTaskQueueFactory queueFactory) { if (queueFactory == null) { return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); } return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS); }
即创建一个 MPSC 队列,
MPSC 队列,Selector,NioEventLoop,这三者均为一对一关系。
接下来是创建线程选择器,
chooser = chooserFactory.newChooser(children);
这里的选择器是
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }
中的DefaultEventExecutorChooserFactory.INSTANCE
,进入
private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTwoEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } }
Netty 通过判断 NioEventLoopGroup 中的 NioEventLoop 是否是2的幂来创建不同的线程选择器,不管是哪一种选择器,最终效果都是从第一个 NioEvenLoop 遍历到最后一个NioEventLoop,再从第一个开始,如此循环。GenericEventExecutorChooser 通过简单的累加取模来实现循环的逻辑,而 PowerOfTowEventExecutorChooser 是通过位运算实现的。
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { ...... @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } ...... } private static final class GenericEventExecutorChooser implements EventExecutorChooser { ...... @Override public EventExecutor next() { return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)]; } ...... }
最后总结一下,NioEventLoopGroup 的创建核心就三步
-
创建ThreadPerTaskExecutor;
-
创建NioEventLoop;
-
创建线程选择器。
完整代码见:hello-netty
本文所有图例见:processon: Netty学习笔记
更多内容见:Netty专栏