13.一文彻底了解线程池

大家好,我是王有志。关注王有志,一起聊技术,聊游戏,聊在外漂泊的生活。

最近搞了个交流群:共同富裕的Java人,核心功能是提供面试交流场所,分享八股文或面试心得,宗旨是“Javaer help Javaer”,希望能够借他人之经验,攻克我之面试,欢迎各位加入我们

下面,我们开始今天的主题:线程池。线程池是面试中必问的八股文,我将涉及到到的问题分为3大类:

  • 基础使用

    • 线程池是什么?为什么要使用线程池?

    • Executor框架是什么?

    • Java提供了哪些线程池?

  • 实现原理

    • 线程池的底层原理是如何实现的?

    • 创建线程池的参数有哪些?

    • 线程池中的线程是什么时间创建的?

  • 系统设计

    • 如何合理的设置线程池的大小?

    • 如果服务器宕机,怎么处理队列中的任务?

希望今天的内容能够帮你解答以上的问题。

Tips

  • 本文使用Java 11源码进行分析;

  • 文章会在源码中添加注释,关键内容会有单独的分析。

池化思想

在你的编程生涯中,一定遇到过各种各样的“池”,如:数据库连接池,常量池,以及今天的线程池。无一例外,它们都是借助池化思想来管理计算机中的资源。

维基百科中是这样描述“池化”的:

In resource management, pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.

“池化”指的是将资源汇聚到一起,以发挥优势或降低风险

接着来看维基百科中对“”的描述:

In computer science, a pool is a collection of resources that are kept, in memory, ready to use, rather than the memory acquired on use and the memory released afterwards.A pool client requests a resource from the pool and performs desired operations on the returned resource. When the client finishes its use of the resource, it is returned to the pool rather than released and lost.

计算机科学中的“池”,是内存中保存资源的集合,创建资源以备使用,停用时回收,而不是使用时创建,停用时丢弃。客户端从池中请求资源,并执行操作,当不再使用资源时,将资源归还到池中,而不是释放或丢弃

为什么要使用“池”?

首先"池"是资源的集合,通过“池”可以实现对资源的统一管理

其次,“池”内存放已经创建并初始化的资源,使用时直接从“池”内获取,跳过了创建及初始化的过程,提高了响应速度

最后,资源使用完成后归还到“池”中,而非丢弃或销毁,提高资源的利用率

线程池

池化思想的引入是为了解决资源管理中遇到的问题,线程池正是借助池化思想实现的线程管理工具。那么线程池可以帮助我们解决哪些实际的问题呢?

最直接的是控制线程的创建,不加以限制的创建线程会耗尽系统资源。不信的话你可以试试下面的代码:

public static void main(String[] args) {   while (true) {     new Thread(()-> {     }).start();   } } 

Tips:卡顿警告~~

其次,线程的创建和销毁是需要时间的,借助线程池可以有效的避免线程频繁的创建和销毁线程,提高程的序响应速度。

问题解答:线程池是什么?为什么要使用线程池?

Executor体系

Java中提供了功能完善的Executor体系,用于实现线程池。先来了解下Executor体系中的核心成员间的关系:

13.一文彻底了解线程池

Executor体系的最顶层是Executor接口和ExecutorService接口,它们定义了Executor体系的核心功能。

Executor接口

Executor接口的注释:

An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.

Executor接口非常简单,只定义了execute方法,主要目的是将Runnable任务与执行机制(线程,调度任务等)解耦,提供了执行Runnable任务的方法

public interface Executor {      /**    * Executes the given command at some time in the future.  The command    * may execute in a new thread, in a pooled thread, or in the calling    * thread, at the discretion of the {@code Executor} implementation.    */   void execute(Runnable command); } 

ExecutorService接口

ExecutorService接口继承了Executor接口,拓展了Executor接口的能力。ExecutorService接口的注释:

An Executor that provides methods to manage termination and methods that can produce a Future for tracking progress of one or more asynchronous tasks.

ExecutorService接口关键方法的声明:

public interface ExecutorService extends Executor {      /**    * Initiates an orderly shutdown in which previously submitted    * tasks are executed, but no new tasks will be accepted.    * Invocation has no additional effect if already shut down.    */   void shutdown();      /**    * Attempts to stop all actively executing tasks, halts the    * processing of waiting tasks, and returns a list of the tasks    * that were awaiting execution.    * This method does not wait for actively executing tasks to    * terminate.  Use {@link #awaitTermination awaitTermination} to    * do that.    */   List<Runnable> shutdownNow();      boolean isShutdown();      boolean isTerminated();      /**    * Blocks until all tasks have completed execution after a shutdown    * request, or the timeout occurs, or the current thread is    * interrupted, whichever happens first.    */   boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;      <T> Future<T> submit(Callable<T> task);      <T> Future<T> submit(Runnable task, T result);      /**    * Submits a Runnable task for execution and returns a Future    * representing that task. The Future's {@code get} method will    * return {@code null} upon <em>successful</em> completion.    */   Future<?> submit(Runnable task); } 

对关键方法做一个说明:

  • 继承自Executor接口:

    • execute:执行Runnable任务;
  • ExecutorService接口定义的方法:

    • submit:执行RunnableCallable任务,并返回Future

    • shutdown:允许已提交的任务执行完毕,但不接受新任务的关闭;

    • shutdownNow:尝试关闭所有任务(正在/等待执行),并返回等待执行的任务。

Tips:其余方法建议阅读源码中的注释,即便是提到的4个方法,也要阅读注释

问题解答:Executor框架是什么?

ThreadPoolExecutor核心流程

Executor体系中,大家最熟悉的一定是ThreadPoolExecutor实现了,也是我们能够实现自定义线程池的基础。接下来逐步分析ThreadPoolExecutor的实现原理。

构造线程池

ThreadPoolExecutor提供了4个构造方法,我们来看参数最全的那个构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {   if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)      throw new IllegalArgumentException();   if (workQueue == null || threadFactory == null || handler == null)      throw new NullPointerException();      this.corePoolSize = corePoolSize;   this.maximumPoolSize = maximumPoolSize;   this.workQueue = workQueue;   this.keepAliveTime = unit.toNanos(keepAliveTime);   this.threadFactory = threadFactory;   this.handler = handler; } 

ThreadPoolExecutor的构造方法提供了7个参数:

  • int corePoolSize线程池的核心线程数量,创建线程的数量小于等于corePoolSize时,会一直创建线程

  • int maximumPoolSize线程池的最大线程数量,当线程数量等于corePoolSize后且队列已满,允许继续创建((maximumPoolSize-corePoolSize))个线程;

  • long keepAliveTime线程的最大空闲时间,当创建了超出corePoolSize数量的线程后,这些线程在不执行任务时能够存活的时间,超出keepAliveTime后会被销毁;

  • TimeUnit unitkeepAliveTime的单位;

  • BlockingQueue<Runnable> workQueue阻塞队列,用于保存等待执行的任务;

  • ThreadFactory threadFactory线程工厂,用于创建线程,默认使用Executors.defaultThreadFactory()

  • RejectedExecutionHandler handler拒绝策略,当队列已满,且没有空闲的线程时,执行的拒绝任务的策略。

Tips:有些小伙伴会疑问,如果每次执行一个任务,执行完毕后再执行新任务,线程池依旧会创建corePoolSize个线程吗?答案是会的,后文解释。

问题解答:创建线程池的参数有哪些?

主控状态CTL与线程池状态

ThreadPoolExecutor中定义了主控状态CTL线程池状态

/**  * The main pool control state, ctl, is an atomic integer packing  * two conceptual fields  *   workerCount, indicating the effective number of threads  *   runState,    indicating whether running, shutting down etc  */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  private static final int COUNT_BITS = Integer.SIZE - 3; // 29 private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;// 0001 1111 1111 1111 1111 1111 1111 1111  private static final int RUNNING    = -1 << COUNT_BITS;// 111 0 0000 0000 0000 0000 0000 0000 0000 private static final int SHUTDOWN   =  0 << COUNT_BITS;// 000 0 0000 0000 0000 0000 0000 0000 0000 private static final int STOP       =  1 << COUNT_BITS;// 001 0 0000 0000 0000 0000 0000 0000 0000 private static final int TIDYING    =  2 << COUNT_BITS;// 010 0 0000 0000 0000 0000 0000 0000 0000 private static final int TERMINATED =  3 << COUNT_BITS;// 011 0 0000 0000 0000 0000 0000 0000 0000  private static int runStateOf(int c)     { return c & ~COUNT_MASK; } private static int workerCountOf(int c)  { return c & COUNT_MASK; } private static int ctlOf(int rs, int wc) { return rs | wc; } 

CTL包含了两部分内容:线程池状态(runState,源码中使用rs替代)工作线程数(workCount,源码中使用wc替代)。当看到位运算符和“MASK”一起出现时,就应该想到应用了位掩码技术。

主控状态CTL的默认值是RUNNING | 0即:1110 0000 0000 0000 0000 0000 0000 0000。runStateOf方法返回低29位为0的CTL,与之对应的是线程池状态,workerCountOf方法则返回高3位为0的CTl,用低29位表示工作线程数量,所以线程池最多允许536870911个线程。

Tips

  • 工作线程指的是已经创建的线程,并不一定在执行任务,后文解释;

  • 位运算的可以参考编程技巧:“高端”的位运算

  • Java中二进制使用补码,注意原码,反码和补码间的转换。

线程池的状态

注释中对线程池的状态做出了详细的说明:

RUNNING:   Accept new tasks and process queued tasks
SHUTDOWN: Don't accept new tasks, but process queued tasks
STOP:     Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
TIDYING:   All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED: terminated() has completed

  • RUNNING:接收新任务,处理队列中的任务;

  • SHUTDOWN:不接收新任务,处理队列中的任务;

  • STOP:不接收新任务,不处理队列中的任务,中断正在执行的任务;

  • TIDYING:所有任务已经执行完毕,并且工作线程为0,转换到TIDYING状态后将执行Hook方法terminated()

  • TERMINATEDterminated()方法执行完毕。

状态的转换

注释中也对线程池状态的转换做出了详细说明:

RUNNING -> SHUTDOWN On invocation of shutdown()
(RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
SHUTDOWN -> TIDYING When both queue and pool are empty
STOP -> TIDYING When pool is empty
TIDYING -> TERMINATED When the terminated() hook method has completed

我们通过一张状态转换图来了解线程池状态之间的转换:

13.一文彻底了解线程池

结合源码,可以看到线程池的状态从RUNNING到TERMINATED其数值是单调递增的,换句话说线程池从“活着”到“死透”所对应的数值是逐步增大,所以可以使用数值间的比较去确定线程池处于哪一种状态。

使用线程池

我们已经对ThreadPoolExecutor有了一个整体的认知,现在可以创建并使用线程池了:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(6));  threadPoolExecutor.submit(() -> {   // 业务逻辑 }); 

这里我使用最“简单”的构造方法,我们看到在线程池中提交任务使用的是submit方法,该方法在抽象类AbstractExecutorService中实现:

public abstract class AbstractExecutorService implements ExecutorService {   public Future<?> submit(Runnable task) {     if (task == null) throw new NullPointerException();     RunnableFuture<Void> ftask = newTaskFor(task, null);     execute(ftask);     return ftask;   }      public Future<?> submit(Runnable task) {     if (task == null) throw new NullPointerException();     RunnableFuture<Void> ftask = newTaskFor(task, null);     execute(ftask);     return ftask;   }      public <T> Future<T> submit(Callable<T> task) {     if (task == null) throw new NullPointerException();     RunnableFuture<T> ftask = newTaskFor(task);     execute(ftask);     return ftask;   } } 

submit的重载方法之间只有参数列表的差别,实现逻辑是相同的,均是先封装RunnableFuture对象,再调用ThreadPoolExecutor#execute方法。

问题解答:submit()execute()方法有什么区别?

execute方法

继承自Executor接口的execute方法是线程池的关键方法:

public void execute(Runnable command) {   // 检测待执行任务   if (command == null) {     throw new NullPointerException();   }      // 获取主控状态CTL   int c = ctl.get();      // STEP 1: 当工作线程数量小于核心线程时,执行addWorker方法   if (workerCountOf(c) < corePoolSize) {     if (addWorker(command, true)) {       return;     }     c = ctl.get();   }      // 当工作线程数量大于核心线程数量时   // STEP 2: 首先判断线程池是否处于运行状态,接着尝试添加到队列中   if (isRunning(c) && workQueue.offer(command)) {     // 再次检查线程池状态     int recheck = ctl.get();     // 不再处于RUNNING,则从队列中删除当前任务,并执行拒绝策略     if (!isRunning(recheck) && remove(command)) {       reject(command);     } else if (workerCountOf(recheck) == 0) {       addWorker(null, false);     }   }      // STEP 3: 无法添加到队列时,尝试执行addWorker   else if (!addWorker(command, false))     // addWorker执行失败,则执行拒绝策略     reject(command); } 

阅读execute方法的源码时需要知道一个前提,addWorker方法会检查线程池状态和工作线程数量,并执行工作任务。接着来看execute方法的3种执行情况:

  • STEP 1:线程池状态:RUNNING,工作线程数:小于核心线程数,此时执行addWorker(command, true)

  • STEP 2:线程池状态:RUNNING,工作线程数:等于核心线程数,队列:未饱和,添加到队列中;

  • STEP 3:线程池状态:RUNNING,工作线程数:等于核心线程数,队列:已饱和,执行addWorker(command, false)

需要重点关注STEP 1的部分,还记得构造线程池最后的问题吗?STEP 1便解释了为什么一个接一个的执行任务,依旧会创建出corePoolSize个线程。接着我们通过一张流程图展示execute方法的执行流程:

13.一文彻底了解线程池

流程图画得比较“复杂”,因为有些判断看似在一行中执行,实际上是借助了&&运算符短路的特性来决定是否执行,例如isRunning(c) && workQueue.offer(command)中,如果isRunning(c) == false则不会执行workQueue.offer(command)

addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) 

返回值为布尔类型表示是否成功执行,参数列表中有两个参数:

  • Runnable firstTask,待执行任务;

  • boolean core,true表示最多允许创建corePoolSize个线程,false表示使用最多允许创建maximumPoolSize个线程。

在分析execute方法的过程中,我们提前“剧透”了addWorker方法的功能:

  • 检查线程池状态和工作线程数量

  • 执行工作任务

因此addWorker方法的源码部分我们分成两部分来看。

Tips再次强调本文使用Java 11源码进行分析,在addWorker方法的实现上Java 11与Java 8存在差异。

检查线程池状态和工作线程数量

第一部分是线程池状态和工作线程数量检查的源码:

retry: // 获取主控状态CTL for (int c = ctl.get();;) {      // 注释1   // Java 11相对友好很多,减少了很多!的使用,看起来比较符合人的思维   // 这部分判断可以分成两部分:   //   1. 至少为SHUTDOWN状态   //   2.条件3选1满足:   //     2-1,至少为STOP状态   //     2-2,firstTask不为空   //     2-3,workQueue为空   if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) {     return false;   }   for (;;) {     // core == true,保证工作线程数量小于核心线程数量     // core == false,保证线程数量小于最大线程数量     if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) {       return false;     }          // 增加工作线程数量并退出     if (compareAndIncrementWorkerCount(c)) {       break retry;     }          // 如果至少是SHUTDOWN状态,则重新执行     c = ctl.get();     if (runStateAtLeast(c, SHUTDOWN)) {       continue retry;     }      } } 

注释1的代码并不复杂,只是需要结合线程池在不同状态下的处理逻辑来分析:

  • 当状态“至少”为SHUTDOWN时,什么情况不需要处理?

    • 添加新的任务(对应条件2-2)

    • 队列为空(对应条件2-3)

  • 当状态“至少”为STOP时,线程池应当立即停止,不接收,不处理。

Tips:线程池状态的部分说线程池状态从RUNNING到TERMINATED是单调递增的,因此在Java 11的实现中才会出现runStateAtLeast方法。

执行工作任务

第二部分是执行工作任务的源码:

boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try {   // 创建Worker对象   w = new Worker(firstTask);   // 从worker对象中获取线程   final Thread t = w.thread;   if (t != null) {     // 上锁     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {       int c = ctl.get();       // 线程池状态检查       // RUNNING状态,或者“小于”STOP状态(处理队列中的任务)       if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {         // 线程状态检查         if (t.getState() != Thread.State.NEW) {           throw new IllegalThreadStateException();         }         // 将Worker对象添加到workers中         workers.add(w);         workerAdded = true;         int s = workers.size();         if (s > largestPoolSize) {           // 记录线程池中出现过的最大线程数           largestPoolSize = s;         }       }     } finally {       mainLock.unlock();     }          if (workerAdded) {       // 启动线程       t.start();       workerStarted = true;     }   } } finally {   if (! workerStarted) {     // addWorker执行失败     // addWorkerFailed中包含工作线程数减1的逻辑     addWorkerFailed(w);   }   } return workerStarted; 

结合两部分代码,一个正向流程是这样的:

  • 检查状态:检查是否允许创建Worker,如果允许执行compareAndIncrementWorkerCount(c),CTL中工作线程数量+1;

  • 执行任务:创建Worker对象,通过Worker对象获取线程,添加到workers中,最后启动线程。

回过头看我们之前一直提到的工作线程,实际上是Worker对象,我们可以近似的将Worker对象和工作线程画上等号。

问题解答:线程池中的线程是什么时间创建的?

三调addWorker

execute方法中,有3种情况调用addWorker方法:

  • STEP 1addWorker(command, true)

  • STEP 2addWorker(null, false)

  • STEP 3addWorker(command, false)

STEP 1和STEP 3很好理解,STEP 1最多允许创建corePoolSize个线程,STEP 3最多允许创建maximumPoolSize个线程。STEP 2就比较难理解了,传入了空任务然后调用addWorker方法。

什么情况下会执行到addWorker(null, false)

  • 第1个条件:(workerCount geq corePoolSize)

  • 第2个条件:isRunning(c) && workQueue.offer(command)

  • 第3个条件:workerCountOf(recheck) == 0

处于RUNNING状态的条件不难理解,矛盾的是第1个条件和第3个条件。根据这两个条件可以得到:(corePoolSize leq workCount = 0),也就是说允许创建核心线程数为0的线程池

接着我们来看addWorker(null, false)做了什么?创建了Worker对象,添加到workers中,并调用了一次Thread.start,虽然没有任何待执行的任务