public class Executors { ... //Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. //At any point, at most nThreads threads will be active processing tasks. //If additional tasks are submitted when all threads are active, //they will wait in the queue until a thread is available. //If any thread terminates due to a failure during execution prior to shutdown, //a new one will take its place if needed to execute subsequent tasks. //The threads in the pool will exist until it is explicitly ExecutorService#shutdown shutdown. //Fixed线程池会通过固定数量的线程,配合一个无界队列来处理提交的任务; //无论什么时候最多只能有指定数量的线程来处理任务; //如果线程池里所有的线程都在繁忙地处理任务,此时再次提交任务时,就只能把任务压入无界队列中等待; //如果线程池里的某线程挂掉了,此时会启动一个新的线程加入到线程池中; //线程池里的线程会一直存活,等待处理新提交过来的任务,直到关闭线程池; public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( nThreads,//核心线程数 nThreads,//最大线程数 0L,//线程存活时间 TimeUnit.MILLISECONDS,//线程存活时间的单位 new LinkedBlockingQueue<Runnable>()//阻塞队列,用来存放待处理的任务 ); } ... }
二.newSingleThreadExecutor()方法
该方法提供只有一个线程的线程池,这意味着所有任务只会由一个线程来执行,因此可以保证任务执行的顺序。
Single线程池会通过一个线程,配合一个无界队列来处理提交的任务。
public class Executors { ... //Creates an Executor that uses a single worker thread operating off an unbounded queue. //(Note however that if this single thread terminates due to a failure during execution prior to shutdown, //a new one will take its place if needed to execute subsequent tasks.) //Tasks are guaranteed to execute sequentially, and no more than one task will be active at any given time. //Unlike the otherwise equivalent public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService( new ThreadPoolExecutor( 1,//核心线程数 1,//最大线程数 0L,//线程存活时间 TimeUnit.MILLISECONDS,//线程存活时间的单位 new LinkedBlockingQueue<Runnable>()//阻塞队列,用来存放待处理的任务 ) ); } ... }
public class Executors { ... //Creates a thread pool that creates new threads as needed, //but will reuse previously constructed threads when they are available. //These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks. //Calls to execute will reuse previously constructed threads if available. //If no existing thread is available, a new thread will be created and added to the pool. //Threads that have not been used for sixty seconds are terminated and removed from the cache. //Thus, a pool that remains idle for long enough will not consume any resources. //Note that pools with similar properties but different details (for example, timeout parameters) //may be created using ThreadPoolExecutor constructors. public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( 0,//核心线程数 Integer.MAX_VALUE,//最大线程数 60L,//线程存活时间 TimeUnit.SECONDS,//线程存活时间的单位 new SynchronousQueue<Runnable>()//阻塞队列,用来存放待处理的任务 ); } ... }
public class Executors { ... //Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } //Creates a new ScheduledThreadPoolExecutor with the given core pool size. public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } ... }
public class ThreadPoolExecutor extends AbstractExecutorService { ... //用来存储线程池的状态和线程数量的原子变量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //从整型变量c中获取线程池的状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //从整型变量c中获取当前的线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } //用来更新线程池中的ctl原子变量,也就是更新线程池的状态和线程数量 private static int ctlOf(int rs, int wc) { return rs | wc; } ... }
public class ThreadPoolExecutor extends AbstractExecutorService { ... //Executes the given task sometime in the future. //The task may execute in a new thread or in an existing pooled thread. //If the task cannot be submitted for execution, //either because this executor has been shutdown or because its capacity has been reached, //the task is handled by the current RejectedExecutionHandler. //向线程池提交一个任务 public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } //Proceed in 3 steps: //1. If fewer than corePoolSize threads are running, //try to start a new thread with the given command as its first task. //The call to addWorker atomically checks runState and workerCount, //and so prevents false alarms that would add threads when it shouldn't, by returning false. //2. If a task can be successfully queued, //then we still need to double-check whether we should have added a thread //(because existing ones died since last checking) or that the pool shut down since entry into this method. //So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none. //3. If we cannot queue task, then we try to add a new thread. //If it fails, we know we are shut down or saturated and so reject the task. int c = ctl.get(); //首先根据ctl当前的值来判断当前线程数量是否小于核心线程数量,主要用来解决线程池中核心线程未初始化的问题 if (workerCountOf(c) < corePoolSize) { //如果小于,则调用addWorker()方法创建一个核心线程并启动,同时把当前任务command传递进去直接执行 if (addWorker(command, true)) { return; } c = ctl.get(); } //如果核心线程已经初始化,则把任务command添加到阻塞队列workQueue中 //LinkedBlockingQueue的offer()方法是非阻塞的,而put()和take()方法则是阻塞的 //如果该阻塞队列满了,不会阻塞等待其他线程take()一个元素,而是直接返回false //但LinkedBlockingQueue默认下是近乎无界的,此时可认为offer()入队时,永远返回true if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) { reject(command); } else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } else if (!addWorker(command, false)) {//如果添加失败,说明队列已满,则调用addWorker()创建非核心线程 //如果通过addWorker()方法创建非核心线程失败,则说明当前线程池中的线程总数已达到最大线程数量 //此时需要调用reject()方法执行拒绝策略 reject(command); } } //从整型变量c中获取当前的线程数量 private static int workerCountOf(int c) { return c & CAPACITY; } ... }
public class ThreadPoolExecutor extends AbstractExecutorService { ... //Checks if a new worker can be added with respect to current pool state and the given bound (either core or maximum). //If so, the worker count is adjusted accordingly, //and, if possible, a new worker is created and started, running firstTask as its first task. //This method returns false if the pool is stopped or eligible to shut down. //It also returns false if the thread factory fails to create a thread when asked. //If the thread creation fails, either due to the thread factory returning null, //or due to an exception (typically OutOfMemoryError in Thread.start()), we roll back cleanly. private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //第一部分:根据线程池状态和阻塞队列来判断是否需要创建新的Worker int c = ctl.get(); int rs = runStateOf(c); //Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) { return false; } //第一部分结束 //第二部分:通过CAS+自旋更新ctl中的线程数量 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) { return false; } if (compareAndIncrementWorkerCount(c)) { break retry; } c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) { continue retry; } //else CAS failed due to workerCount change; retry inner loop } //第二部分结束 } //第三部分:创建并启动工作线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //初始化一个Worker并把firstTask传进去,在Worker初始化后,会同步创建一个新的线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //增加全局锁,因为线程池在关闭时会抢占这把锁 //这里加锁可以避免在创建工作线程时其他线程关闭线程池 mainLock.lock(); try { //Recheck while holding lock. //Back out on ThreadFactory failure or if shut down before lock acquired. int rs = runStateOf(ctl.get()); //判断是否允许添加工作线程 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) {// precheck that t is startable throw new IllegalThreadStateException(); } //因为前面已经使用了独占锁,所以workers集合使用非线程安全的HashSet即可 workers.add(w); int s = workers.size(); if (s > largestPoolSize) { largestPoolSize = s; } workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //通过Thread的start()方法启动这个新创建的线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) { //如果线程启动失败,则要从集合中删除新增的线程,并回退增加的线程数 addWorkerFailed(w); } } return workerStarted; } ... }
public class ThreadPoolExecutor extends AbstractExecutorService { ... //Class Worker mainly maintains interrupt control state for threads running tasks, along with other minor bookkeeping. //This class opportunistically extends AbstractQueuedSynchronizer to simplify acquiring and releasing a lock surrounding each task execution. //This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run. //We implement a simple non-reentrant mutual exclusion lock rather than use ReentrantLock //because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize. //Additionally, to suppress interrupts until the thread actually starts running tasks, //we initialize lock state to a negative value, and clear it upon start (in runWorker). private final class Worker extends AbstractQueuedSynchronizer implements Runnable { //Thread this worker is running in. Null if factory fails. final Thread thread; //Initial task to run. Possibly null. Runnable firstTask; //Creates with given first task and thread from ThreadFactory. //@param firstTask the first task (null if none) Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //通过getThreadFactory().newThread()方法将实现了Runnable接口的Worker传递进去来创建一个线程 //这样线程的引用就会指向传递进去的Worker实例 //所以在addWorker()方法中通过t.start()方法启动线程时,就会触发执行Worker的run()方法 this.thread = getThreadFactory().newThread(this); } //Delegates main run loop to outer runWorker public void run() { runWorker(this); } ... } ... }
public class ThreadPoolExecutor extends AbstractExecutorService { ... //Main worker run loop. //Repeatedly gets tasks from queue and executes them, while coping with a number of issues: //1.We may start out with an initial task, in which case we don't need to get the first one. //Otherwise, as long as pool is running, we get tasks from getTask. //If it returns null then the worker exits due to changed pool state or configuration parameters. //Other exits result from exception throws in external code, in which case completedAbruptly holds, //which usually leads processWorkerExit to replace this thread. //2.Before running any task, the lock is acquired to prevent other pool interrupts while the task is executing, //and then we ensure that unless pool is stopping, this thread does not have its interrupt set. //3.Each task run is preceded by a call to beforeExecute, which might throw an exception, //in which case we cause thread to die (breaking loop with completedAbruptly true) without processing the task. //4.Assuming beforeExecute completes normally, we run the task, //gathering any of its thrown exceptions to send to afterExecute. //We separately handle RuntimeException, Error (both of which the specs guarantee that we trap) and arbitrary Throwables. //Because we cannot rethrow Throwables within Runnable.run, //we wrap them within Errors on the way out (to the thread's UncaughtExceptionHandler). //Any thrown exception also conservatively causes thread to die. //5.After task.run completes, we call afterExecute, which may also throw an exception, which will also cause thread to die. //According to JLS Sec 14.20, this exception is the one that will be in effect even if task.run throws. //The net effect of the exception mechanics is that afterExecute and //the thread's UncaughtExceptionHandler have as accurate information as we can provide about any problems encountered by user code. final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //只要task不为空就一直循环,getTask()会从阻塞队列中获取任务 while (task != null || (task = getTask()) != null) { //由于Worker继承了AQS,所以w.lock()表示当前Worker要开始执行任务了 w.lock(); //If pool is stopping, ensure thread is interrupted; if not, ensure thread is not interrupted. //This requires a recheck in second case to deal with shutdownNow race while clearing interrupt //用来判断是否应该中断当前线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) { wt.interrupt();//中断当前线程 } try { beforeExecute(wt, task); Throwable thrown = null; try { //通过调用task任务的run()方法来执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //处理工作线程的回收 processWorkerExit(w, completedAbruptly); } } ... }
public class ThreadPoolExecutor extends AbstractExecutorService { ... //Performs blocking or timed wait for a task, depending on current configuration settings, //or returns null if this worker must exit because of any of: //1.There are more than maximumPoolSize workers (due to a call to setMaximumPoolSize). //2.The pool is stopped. //3.The pool is shutdown and the queue is empty. //4.This worker timed out waiting for a task, and timed-out workers are subject to termination //(that is, allowCoreThreadTimeOut || workerCount > corePoolSize) both before and after the timed wait, //and if the queue is non-empty, this worker is not the last thread in the pool. //@return task, or null if the worker must exit, in which case workerCount is decremented private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) {//自旋 int c = ctl.get(); int rs = runStateOf(c); //Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //Are workers subject to culling? //是否允许线程超时后退出 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) { return null; } continue; } try { //通过workQueue.take()方法从阻塞队列中获取任务 //如果timed为true,即允许线程超时后退出,那么就使用poll()超时获取则返回null来结束线程的run()方法 //如果timed为false,即不允许线程超时后退出,那么就使用take()方法阻塞式获取队列中的任务,此时不会结束线程的run()方法 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) { return r; } timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ... }
public class ThreadPoolExecutor extends AbstractExecutorService { ... //Performs cleanup and bookkeeping for a dying worker. //Called only from worker threads. Unless completedAbruptly is set, //assumes that workerCount has already been adjusted to account for exit. //This method removes thread from worker set, //and possibly terminates the pool or replaces the worker //if either it exited due to user task exception or //if fewer than corePoolSize workers are running or queue is non-empty but there are no workers. //@param w the worker //@param completedAbruptly if the worker died due to user exception private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) {//If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //记录总的完成任务数 completedTaskCount += w.completedTasks; //把当前线程从集合中移除 workers.remove(w); } finally { mainLock.unlock(); } //尝试结束线程池 tryTerminate(); int c = ctl.get(); //如果线程池还处于RUNNING或SHUTDOWN状态,则需要判断是否需要再增加一个工作线程来处理线程池中的任务 if (runStateLessThan(c, STOP)) { //completedAbruptly = true,说明执行的任务时出现了异常 if (!completedAbruptly) { //min代表的是核心线程数量 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //没有工作线程,阻塞队列不为空 if (min == 0 && ! workQueue.isEmpty()) { min = 1; } //如果工作线程的数量大于等于线程池的最小线程数量 if (workerCountOf(c) >= min) { return; // replacement not needed } } //如果执行的任务时出现了异常则添加工作线程 //如果工作线程数量小于最小线程数量则添加工作线程 addWorker(null, false); } } ... }
public class ThreadPoolExecutor extends AbstractExecutorService { ... //Handler called when saturated or shutdown in execute. private volatile RejectedExecutionHandler handler; //The default rejected execution handler private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); //向线程池提交一个任务 public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } int c = ctl.get(); //首先根据ctl当前的值来判断当前线程数量是否小于核心线程数量,主要用来解决线程池中核心线程未初始化的问题 if (workerCountOf(c) < corePoolSize) { //如果小于,则调用addWorker()方法创建一个核心线程并启动,同时把当前任务command传递进去直接执行 if (addWorker(command, true)) { return; } c = ctl.get(); } //如果核心线程已经初始化,则把任务command添加到阻塞队列workQueue中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) { reject(command); } else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } else if (!addWorker(command, false)) {//如果添加失败,说明队列已满,则调用addWorker()创建非核心线程 //如果通过addWorker()方法创建非核心线程失败,则说明当前线程池中的线程总数已达到最大线程数量 //此时需要调用reject()方法执行拒绝策略 reject(command); } } //Invokes the rejected execution handler for the given command. //Package-protected for use by ScheduledThreadPoolExecutor. final void reject(Runnable command) { handler.rejectedExecution(command, this); } ... //A handler for rejected tasks that throws a RejectedExecutionException. //这是ThreadPoolExecutor默认使用的拒绝策略 //这种策略就是简单抛出一个RejectedExecutionHandler异常 public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } //Always throws RejectedExecutionException. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } //A handler for rejected tasks that runs the rejected task directly in the calling thread of the execute method, //unless the executor has been shut down, in which case the task is discarded. //只要线程池没有被关闭,就由提交任务的线程执行任务的run()方法来执行 public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } //Executes task r in the caller's thread, //unless the executor has been shut down, in which case the task is discarded. //@param r the runnable task requested to be executed //@param e the executor attempting to execute this task public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } //A handler for rejected tasks that silently discards the rejected task. //直接把任务丢弃,不做任何处理 public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } //Does nothing, which has the effect of discarding task r. //@param r the runnable task requested to be executed //@param e the executor attempting to execute this task public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } //A handler for rejected tasks that discards the oldest unhandled request and then retries execute, //unless the executor is shut down, in which case the task is discarded. //通过workQueue.poll()把阻塞队列头部也就是等待最久的任务丢弃 public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } //Obtains and ignores the next task that the executor would otherwise execute, //if one is immediately available, and then retries execution of task r, //unless the executor is shut down, in which case task r is instead discarded. //@param r the runnable task requested to be executed //@param e the executor attempting to execute this task public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } } }
注意:使用无界队列的线程池(如Fixed线程池),不会使用拒绝策略。
public class ThreadPoolExecutor extends AbstractExecutorService { ... public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( nThreads,//核心线程数 nThreads,//最大线程数 0L,//线程存活时间 TimeUnit.MILLISECONDS,//线程存活时间的单位 new LinkedBlockingQueue<Runnable>()//阻塞队列,用来存放待处理的任务 ); } public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { return; } c = ctl.get(); } //通过LinkedBLockingQueue的offer()方法添加任务到无界的阻塞队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) { reject(command); } else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } //Fixed线程池不会执行到这里 else if (!addWorker(command, false)) { reject(command); } } ... }