Java 多线程(七):线程池
作者:Grey
原文地址:
工作原理
线程池内部是通过队列结合线程实现的,当我们利用线程池执行任务时:
-
如果此时线程池中的线程数量小于
corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 -
如果此时线程池中的线程数量等于
corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。 -
如果此时线程池中的线程数量大于等于
corePoolSize,缓冲队列workQueue已满,并且线程池中的线程数量小于maximumPoolSize,建新的线程来处理被添加的任务。 -
如果此时线裎池中的线数量大于
corePoolSize,缓存冲队列workQueue已满, 并且线程池中的数量等于maximumPoolSize,那么过handler所指定的策略来处理此任务。 -
当线程池中的线程数量大于
corePoolSize时,如果某线程空闲时间超过keepAliveTime, 线将被终止。这样,线程池可以动态的调整池中的线程数。
相关配置
corePoolSize:核心线程数
maximumPoolSize:最大线程数 【包括核心线程数】
keepAliveTime:生存时间【线程长时间不干活了,归还给操作系统,核心线程不用归还,可以指定是否参与归还过程】
生存时间单位
任务队列:等待队列,如果不指定,最大值是Integer.MAX_VALUE【各种各样的BlockingQueue】
线程工厂【默认设置优先级是普通优先级,非守护线程】,最好自定义线程名称,方便回溯
拒绝策略,包括以下四种:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
执行流程:先占满核心线程-> 再占满任务队列-> 再占满(最大线程数-核心线程数)-> 最后执行拒绝策略
一般自定义拒绝策略:将相关信息保存到redis,kafka,日志,MySQL记录 实现RejectedExecutionHandler并重写rejectedExecution方法
自定义拒绝策略代码示例:
package git.snippets.juc; import java.util.concurrent.*; /** * 自定义拒绝策略 */ public class MyRejectedHandler { public static void main(String[] args) { ExecutorService service = new ThreadPoolExecutor(4, 4, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(), new MyHandler()); } static class MyHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //log("r rejected") //save r kafka mysql redis //try 3 times if (executor.getQueue().size() < 10000) { //try put again(); } } } }
SingleThreadPool
-
保证线程按顺序执行
-
为什么要有单线程的线程池?这个主要是用来做任务队列和线程生命周期管理
-
使用LinkedBlockingQueue作为任务队列,上界为:Integer.MAX_VALUE(2147483647) 约等于无界。
示例代码见:
package git.snippets.juc; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static java.util.concurrent.TimeUnit.SECONDS; public class SingleThreadPoolUsage { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int j = i; service.submit(() -> System.out.println("current thread " + Thread.currentThread() + " " + j)); } service.shutdown(); service.awaitTermination(60, SECONDS); } }
CachedThreadPool
-
corePoolSize:0
-
maxiumPoolSize:Integer.MAX_VALUE(2147483647)
-
keepAliveTime 60秒
-
使用
SynchronousQueue作为任务队列 必须马上执行
使用示例:
package git.snippets.juc; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class CachedThreadPoolUsage { public static void main(String[] args) throws InterruptedException { System.out.println("cached thread pool usage..."); ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for (int i = 0; i < 2; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); TimeUnit.SECONDS.sleep(80); System.out.println(service); } }
FixedThreadPool
-
最大线程数等于核心线程数
-
使用
LinkedBlockingQueue作为任务队列,上界为:Integer.MAX_VALUE(2147483647)
使用示例见:
package git.snippets.juc; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * 多线程和单线程计算某个范围内的所有素数 */ public class FixedThreadPoolUsage { public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); getPrime(1, 200000); long end = System.currentTimeMillis(); System.out.println("use single thread...cost: " + (end - start)); final int cpuCoreNum = 4; ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum); MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20 MyTask t2 = new MyTask(80001, 130000); MyTask t3 = new MyTask(130001, 170000); MyTask t4 = new MyTask(170001, 200000); Future<List<Integer>> f1 = service.submit(t1); Future<List<Integer>> f2 = service.submit(t2); Future<List<Integer>> f3 = service.submit(t3); Future<List<Integer>> f4 = service.submit(t4); System.out.println(); start = System.currentTimeMillis(); f1.get(); f2.get(); f3.get(); f4.get(); end = System.currentTimeMillis(); System.out.println("use fixed thread pool...cost: " + (end - start)); service.shutdown(); service.awaitTermination(1, TimeUnit.MINUTES); } static boolean isPrime(int num) { for (int i = 2; i <= num / 2; i++) { if (num % i == 0) { return false; } } return true; } static List<Integer> getPrime(int start, int end) { List<Integer> results = new ArrayList<>(); for (int i = start; i <= end; i++) { if (isPrime(i)) results.add(i); } return results; } static class MyTask implements Callable<List<Integer>> { int startPos, endPos; MyTask(int s, int e) { this.startPos = s; this.endPos = e; } @Override public List<Integer> call() { List<Integer> r = getPrime(startPos, endPos); return r; } } }
代码说明:本实例演示了多线程和单线程计算某个范围内的所有素数。输出结果如下
use single thread...cost: 1733 use fixed thread pool...cost: 505
ScheduledThreadPool
使用DelayWorkQueue,包括了如下两个主要方法
scheduleAtFixedRate()
当前任务执行时间小于间隔时间,每次到点即执行;
当前任务执行时间大于等于间隔时间,任务执行后立即执行下一次任务。相当于连续执行了。
scheduleWithFixedDelay()
每当上次任务执行完毕后,间隔一段时间执行。不管当前任务执行时间大于、等于还是小于间隔时间,执行效果都是一样的。
使用示例:
package git.snippets.juc; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import static java.util.concurrent.TimeUnit.SECONDS; public class ScheduleThreadPoolUsage { static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public static void main(String[] args) { test1(); test2(); test3(); } /** * 任务执行时间(8s)小于间隔时间(10s) */ public static void test1() { scheduler.scheduleAtFixedRate(() -> { System.out.println("Start: scheduleAtFixedRate: " + new Date()); try { Thread.sleep(8000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("End : scheduleAtFixedRate: " + new Date()); }, 0, 10, SECONDS); } /** * 任务执行时间(12s)大于间隔时间(10s) */ public static void test2() { scheduler.scheduleAtFixedRate(() -> { System.out.println("Start: scheduleAtFixedRate: " + new Date()); try { Thread.sleep(12000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("End : scheduleAtFixedRate: " + new Date()); }, 0, 10, SECONDS); } /** * 任务执行时间(8s)小于间隔时间(10s) */ public static void test3() { scheduler.scheduleWithFixedDelay(() -> { System.out.println("Start: scheduleWithFixedDelay: " + new Date()); try { Thread.sleep(12000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("End : scheduleWithFixedDelay: " + new Date()); }, 0, 10, SECONDS); } }
ForkJoinPool
Java SE 1.7 以后新增的线程池,包括以下两个核心类
第一个是:RecursiveAction
它是一种没有任何返回值的任务。只是做一些工作,比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者 CPU 执行。
我们可以通过继承来实现一个RecursiveAction。
第二个是:RecursiveTask
它是一种会返回结果的任务。可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并。
使用示例:
package git.snippets.juc; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; import java.util.stream.LongStream; /** * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2021/4/25 * @since 1.7 */ public class ForkJoinPoolUsage implements Calculator { private ForkJoinPool pool; public ForkJoinPoolUsage() { // 也可以使用公用的 ForkJoinPool: // pool = ForkJoinPool.commonPool() pool = new ForkJoinPool(); } public static void useRecursiveAction() throws InterruptedException { // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool ForkJoinPool forkJoinPool = new ForkJoinPool(); // 提交可分解的PrintTask任务 forkJoinPool.submit(new MyRecursiveAction(0, 1000)); while (!forkJoinPool.isTerminated()) { forkJoinPool.awaitTermination(2, TimeUnit.SECONDS); } // 关闭线程池 forkJoinPool.shutdown(); } public static void useRecursiveTask() { long[] numbers = LongStream.rangeClosed(1, 1000).toArray(); Calculator calculator = new ForkJoinPoolUsage(); System.out.println(calculator.sumUp(numbers)); // 打印结果500500 } public static void main(String[] args) throws InterruptedException { useRecursiveTask(); useRecursiveAction(); } @Override public long sumUp(long[] numbers) { return pool.invoke(new SumTask(numbers, 0, numbers.length - 1)); } private static class MyRecursiveAction extends RecursiveAction { /** * 每个"小任务"最多只打印20个数 */ private static final int MAX = 20; private int start; private int end; public MyRecursiveAction(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { //当end-start的值小于MAX时,开始打印 if ((end - start) < MAX) { for (int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + "-i的值" + i); } } else { // 将大任务分解成两个小任务 int middle = (start + end) / 2; MyRecursiveAction left = new MyRecursiveAction(start, middle); MyRecursiveAction right = new MyRecursiveAction(middle, end); left.fork(); right.fork(); } } } private static class SumTask extends RecursiveTask<Long> { private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override protected Long compute() { // 当需要计算的数字小于6时,直接计算结果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; // 否则,把任务一分为二,递归计算 } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle + 1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } } interface Calculator { long sumUp(long[] numbers); }
此外,Java 的流式 API 底层也是 ForkJoinPool 实现的。
更多内容参考如下两篇文章
WorkStealingPool
每个线程都有单独的队列,每个线程队列执行完毕后,就会去其他的线程队列里面拿过来执行, 底层是
ForkJoinPool
-
Java SE 1.8 新增
-
会自动启动 CPU 核数个线程去执行任务
使用示例:
/** * */ package git.snippets.juc; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @since 1.8 */ public class WorkStealingPoolUsage { public static void main(String[] args) throws IOException { int core = Runtime.getRuntime().availableProcessors(); // 会自动启动cpu核数个线程去执行任务 ,其中第一个是1s执行完毕,其余都是2s执行完毕, // 有一个任务会进行等待,当第一个执行完毕后,会再次偷取最后一个任务执行 ExecutorService service = Executors.newWorkStealingPool(); service.execute(new R(1000)); for (int i = 0; i < core; i++) { service.execute(new R(2000)); } //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出 System.in.read(); } static class R implements Runnable { int time; R(int t) { this.time = t; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time + " " + Thread.currentThread().getName()); } } }
CompletableFuture
-
Java SE 1.8 新增
-
anyOf()可以实现“任意个 CompletableFuture 只要一个成功”,allOf()可以实现“所有 CompletableFuture 都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
使用示例:
package git.snippets.juc; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** * 假设你能够提供一个服务 * 这个服务查询各大电商网站同一类产品的价格并汇总展示 */ public class CompletableFutureUsage { public static void main(String[] args) throws ExecutionException, InterruptedException { way1(); way2(); } public static void way1() { long start = System.currentTimeMillis(); System.out.println("p1 " + priceOfJD()); System.out.println("p2 " + priceOfTB()); System.out.println("p3 " + priceOfTM()); long end = System.currentTimeMillis(); System.out.println("串行执行,耗时(ms):" + (end - start)); } public static void way2() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); CompletableFuture<Double> p1 = CompletableFuture.supplyAsync(() -> priceOfJD()); CompletableFuture<Double> p2 = CompletableFuture.supplyAsync(() -> priceOfTB()); CompletableFuture<Double> p3 = CompletableFuture.supplyAsync(() -> priceOfTM()); CompletableFuture.allOf(p1, p2, p3).join(); System.out.println("p1 " + p1.get()); System.out.println("p2 " + p2.get()); System.out.println("p3 " + p3.get()); long end = System.currentTimeMillis(); System.out.println("使用CompletableFuture并行执行,耗时(ms): " + (end - start)); } private static double priceOfTM() { delay(); return 1.00; } private static double priceOfTB() { delay(); return 2.00; } private static double priceOfJD() { delay(); return 3.00; } private static void delay() { int time = new Random().nextInt(500); try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }
证明原子操作类比synchronized更高效
示例代码如下
package git.snippets.juc; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** * 证明原子操作类比synchronized更高效 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2021/4/26 */ public class AtomVSSync { public static void main(String[] args) { test1(); } AtomicInteger atomicCount = new AtomicInteger(0); int count = 0; final static int TIMES = 80000000; void m() { for (int i = 0; i < TIMES; i++) { atomicCount.incrementAndGet(); //原子操作 } } void m2() { for (int i = 0; i < TIMES; i++) { synchronized (this) { count++; } } } public static void test1() { AtomVSSync t1 = new AtomVSSync(); AtomVSSync t2 = new AtomVSSync(); long time1 = time(t1::m); System.out.println("使用原子类得到的结果是:" + t1.atomicCount); long time2 = time(t2::m2); System.out.println("使用synchronized得到的结果是:" + t2.count); System.out.println("使用原子类花费的时间是:" + time1); System.out.println("使用 synchronized 花费的时间是 :" + time2); } private static long time(Runnable runnable) { List<Thread> threads = new ArrayList<>(); long startTime = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { threads.add(new Thread(runnable, "thread-" + i)); } threads.forEach(Thread::start); threads.forEach(o -> { try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long endTime = System.currentTimeMillis(); return endTime - startTime; } }
Java SE 11 下运行上述代码,代码输出结果如下:
使用原子类得到的结果是:800000000 使用synchronized得到的结果是:800000000 使用原子类花费的时间是:12111 使用 synchronized 花费的时间是 :16471
AtomXXX类可以保证可见性吗?
可以。
代码如下
package git.snippets.juc; import java.util.concurrent.atomic.AtomicBoolean; /** * AtomXXX类可以保证可见性吗?请写一个程序来证明 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2021/4/26 */ public class AtomVisible { public static void main(String[] args) { test2(); } AtomicBoolean running = new AtomicBoolean(true); void m3() { System.out.println("m1 start"); while (running.get()) { //死循环。只有running=false时,才能执行后面的语句 } System.out.println("m2 end"); } public static void test2() { AtomVisible t = new AtomVisible(); new Thread(t::m3, "t1").start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } t.running.getAndSet(false); } }
输出结果
m1 start m2 end
写一个程序证明AtomXXX类的多个方法并不构成原子性
package git.snippets.juc; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** * 写一个程序证明AtomXXX类的多个方法并不构成原子性 * * @author <a href="mailto:410486047@qq.com">Grey</a> * @date 2021/4/26 */ public class MultiAtomMethod { public static void main(String[] args) { test3(); } AtomicInteger count = new AtomicInteger(0); void m4() { for (int i = 0; i < 10000; i++) { if (count.get() < 999 && count.get() >= 0) { //如果未加锁,之间还会有其他线程插进来 count.incrementAndGet(); } } } public static void test3() { MultiAtomMethod t = new MultiAtomMethod(); List<Thread> threads = new ArrayList<>(); for (int i = 0; i < 100; i++) { threads.add(new Thread(t::m4, "thread" + i)); } threads.forEach(Thread::start); threads.forEach((o) -> { try { //join()方法阻塞调用此方法的线程,直到线程t完成,此线程再继续。通常用于在main()主线程内,等待其它线程完成再结束main()主线程。 o.join(); //相当于在main线程中同步o线程,o执行完了,main线程才有执行的机会 } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }
说明
本文涉及到的所有代码和图例
更多内容见:Java 多线程
参考资料
理解ScheduledExecutorService中scheduleAtFixedRate和scheduleWithFixedDelay的区别