Java 多线程(七):线程池

Java 多线程(七):线程池

作者:Grey

原文地址:

博客园:Java 多线程(七):线程池

CSDN:Java 多线程(七):线程池

工作原理

线程池内部是通过队列结合线程实现的,当我们利用线程池执行任务时:

  1. 如果此时线程池中的线程数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

  2. 如果此时线程池中的线程数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。

  3. 如果此时线程池中的线程数量大于等于corePoolSize,缓冲队列workQueue已满,并且线程池中的线程数量小于maximumPoolSize,建新的线程来处理被添加的任务。

  4. 如果此时线裎池中的线数量大于corePoolSize,缓存冲队列workQueue已满, 并且线程池中的数量等于maximumPoolSize,那么过handler所指定的策略来处理此任务。

  5. 当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime, 线将被终止。这样,线程池可以动态的调整池中的线程数。

相关配置

corePoolSize:核心线程数

maximumPoolSize:最大线程数 【包括核心线程数】

keepAliveTime:生存时间【线程长时间不干活了,归还给操作系统,核心线程不用归还,可以指定是否参与归还过程】

生存时间单位

任务队列:等待队列,如果不指定,最大值是Integer.MAX_VALUE【各种各样的BlockingQueue

线程工厂【默认设置优先级是普通优先级,非守护线程】,最好自定义线程名称,方便回溯

拒绝策略,包括以下四种:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。

ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务

ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务

执行流程:先占满核心线程-> 再占满任务队列-> 再占满(最大线程数-核心线程数)-> 最后执行拒绝策略
一般自定义拒绝策略:将相关信息保存到redis,kafka,日志,MySQL记录 实现RejectedExecutionHandler并重写rejectedExecution方法

自定义拒绝策略代码示例:

package git.snippets.juc;  import java.util.concurrent.*;  /**  * 自定义拒绝策略  */ public class MyRejectedHandler {     public static void main(String[] args) {         ExecutorService service = new ThreadPoolExecutor(4, 4,                 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),                 Executors.defaultThreadFactory(),                 new MyHandler());     }      static class MyHandler implements RejectedExecutionHandler {          @Override         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {             //log("r rejected")             //save r kafka mysql redis             //try 3 times             if (executor.getQueue().size() < 10000) {                 //try put again();             }         }     } }  

SingleThreadPool

  • 保证线程按顺序执行

  • 为什么要有单线程的线程池?这个主要是用来做任务队列和线程生命周期管理

  • 使用LinkedBlockingQueue作为任务队列,上界为:Integer.MAX_VALUE(2147483647) 约等于无界。

示例代码见:

package git.snippets.juc;  import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;  import static java.util.concurrent.TimeUnit.SECONDS;  public class SingleThreadPoolUsage {     public static void main(String[] args) throws InterruptedException {         ExecutorService service = Executors.newSingleThreadExecutor();         for (int i = 0; i < 10; i++) {             final int j = i;             service.submit(() -> System.out.println("current thread " + Thread.currentThread() + "  " + j));         }         service.shutdown();         service.awaitTermination(60, SECONDS);     } }  

CachedThreadPool

  • corePoolSize:0

  • maxiumPoolSize:Integer.MAX_VALUE(2147483647)

  • keepAliveTime 60秒

  • 使用SynchronousQueue作为任务队列 必须马上执行

使用示例:

package git.snippets.juc;  import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;  public class CachedThreadPoolUsage {     public static void main(String[] args) throws InterruptedException {         System.out.println("cached thread pool usage...");         ExecutorService service = Executors.newCachedThreadPool();         System.out.println(service);         for (int i = 0; i < 2; i++) {             service.execute(() -> {                 try {                     TimeUnit.MILLISECONDS.sleep(500);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 System.out.println(Thread.currentThread().getName());             });         }         System.out.println(service);         TimeUnit.SECONDS.sleep(80);         System.out.println(service);     } }  

FixedThreadPool

  • 最大线程数等于核心线程数

  • 使用LinkedBlockingQueue作为任务队列,上界为:Integer.MAX_VALUE(2147483647)

使用示例见:

package git.snippets.juc;  import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit;  /**  * 多线程和单线程计算某个范围内的所有素数  */ public class FixedThreadPoolUsage {     public static void main(String[] args) throws InterruptedException, ExecutionException {         long start = System.currentTimeMillis();         getPrime(1, 200000);         long end = System.currentTimeMillis();         System.out.println("use single thread...cost: " + (end - start));          final int cpuCoreNum = 4;          ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);          MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20         MyTask t2 = new MyTask(80001, 130000);         MyTask t3 = new MyTask(130001, 170000);         MyTask t4 = new MyTask(170001, 200000);          Future<List<Integer>> f1 = service.submit(t1);         Future<List<Integer>> f2 = service.submit(t2);         Future<List<Integer>> f3 = service.submit(t3);         Future<List<Integer>> f4 = service.submit(t4);         System.out.println();         start = System.currentTimeMillis();         f1.get();         f2.get();         f3.get();         f4.get();         end = System.currentTimeMillis();         System.out.println("use fixed thread pool...cost: " + (end - start));         service.shutdown();         service.awaitTermination(1, TimeUnit.MINUTES);     }      static boolean isPrime(int num) {         for (int i = 2; i <= num / 2; i++) {             if (num % i == 0) {                 return false;             }         }         return true;     }      static List<Integer> getPrime(int start, int end) {         List<Integer> results = new ArrayList<>();         for (int i = start; i <= end; i++) {             if (isPrime(i)) results.add(i);         }          return results;     }      static class MyTask implements Callable<List<Integer>> {         int startPos, endPos;          MyTask(int s, int e) {             this.startPos = s;             this.endPos = e;         }          @Override         public List<Integer> call() {             List<Integer> r = getPrime(startPos, endPos);             return r;         }      } } 

代码说明:本实例演示了多线程和单线程计算某个范围内的所有素数。输出结果如下

use single thread...cost: 1733  use fixed thread pool...cost: 505 

ScheduledThreadPool

使用DelayWorkQueue,包括了如下两个主要方法

scheduleAtFixedRate()

当前任务执行时间小于间隔时间,每次到点即执行;

当前任务执行时间大于等于间隔时间,任务执行后立即执行下一次任务。相当于连续执行了。

scheduleWithFixedDelay()

每当上次任务执行完毕后,间隔一段时间执行。不管当前任务执行时间大于、等于还是小于间隔时间,执行效果都是一样的。

使用示例:

package git.snippets.juc;  import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService;  import static java.util.concurrent.TimeUnit.SECONDS;  public class ScheduleThreadPoolUsage {     static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);       public static void main(String[] args) {         test1();         test2();         test3();     }      /**      * 任务执行时间(8s)小于间隔时间(10s)      */     public static void test1() {         scheduler.scheduleAtFixedRate(() -> {             System.out.println("Start: scheduleAtFixedRate:    " + new Date());             try {                 Thread.sleep(8000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("End  : scheduleAtFixedRate:    " + new Date());         }, 0, 10, SECONDS);     }      /**      * 任务执行时间(12s)大于间隔时间(10s)      */     public static void test2() {         scheduler.scheduleAtFixedRate(() -> {             System.out.println("Start: scheduleAtFixedRate:    " + new Date());             try {                 Thread.sleep(12000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("End  : scheduleAtFixedRate:    " + new Date());         }, 0, 10, SECONDS);     }      /**      * 任务执行时间(8s)小于间隔时间(10s)      */     public static void test3() {         scheduler.scheduleWithFixedDelay(() -> {             System.out.println("Start: scheduleWithFixedDelay: " + new Date());             try {                 Thread.sleep(12000);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("End  : scheduleWithFixedDelay: " + new Date());         }, 0, 10, SECONDS);     } }  

ForkJoinPool

Java SE 1.7 以后新增的线程池,包括以下两个核心类

第一个是:RecursiveAction

它是一种没有任何返回值的任务。只是做一些工作,比如写数据到磁盘,然后就退出了。 一个RecursiveAction可以把自己的工作分割成更小的几块, 这样它们可以由独立的线程或者 CPU 执行。
我们可以通过继承来实现一个RecursiveAction

第二个是:RecursiveTask

它是一种会返回结果的任务。可以将自己的工作分割为若干更小任务,并将这些子任务的执行合并到一个集体结果。 可以有几个水平的分割和合并。

使用示例:

package git.snippets.juc;  import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; import java.util.stream.LongStream;  /**  * @author <a href="mailto:410486047@qq.com">Grey</a>  * @date 2021/4/25  * @since 1.7  */ public class ForkJoinPoolUsage implements Calculator {     private ForkJoinPool pool;      public ForkJoinPoolUsage() {         // 也可以使用公用的 ForkJoinPool:         // pool = ForkJoinPool.commonPool()         pool = new ForkJoinPool();     }      public static void useRecursiveAction() throws InterruptedException {         // 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool         ForkJoinPool forkJoinPool = new ForkJoinPool();         // 提交可分解的PrintTask任务         forkJoinPool.submit(new MyRecursiveAction(0, 1000));          while (!forkJoinPool.isTerminated()) {             forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);         }         // 关闭线程池         forkJoinPool.shutdown();     }      public static void useRecursiveTask() {         long[] numbers = LongStream.rangeClosed(1, 1000).toArray();         Calculator calculator = new ForkJoinPoolUsage();         System.out.println(calculator.sumUp(numbers)); // 打印结果500500     }      public static void main(String[] args) throws InterruptedException {         useRecursiveTask();         useRecursiveAction();     }      @Override     public long sumUp(long[] numbers) {         return pool.invoke(new SumTask(numbers, 0, numbers.length - 1));     }      private static class MyRecursiveAction extends RecursiveAction {          /**          * 每个"小任务"最多只打印20个数          */         private static final int MAX = 20;          private int start;         private int end;          public MyRecursiveAction(int start, int end) {             this.start = start;             this.end = end;         }          @Override         protected void compute() {             //当end-start的值小于MAX时,开始打印             if ((end - start) < MAX) {                 for (int i = start; i < end; i++) {                     System.out.println(Thread.currentThread().getName() + "-i的值" + i);                 }             } else {                 // 将大任务分解成两个小任务                 int middle = (start + end) / 2;                 MyRecursiveAction left = new MyRecursiveAction(start, middle);                 MyRecursiveAction right = new MyRecursiveAction(middle, end);                 left.fork();                 right.fork();             }         }       }      private static class SumTask extends RecursiveTask<Long> {         private long[] numbers;         private int from;         private int to;          public SumTask(long[] numbers, int from, int to) {             this.numbers = numbers;             this.from = from;             this.to = to;         }           @Override         protected Long compute() {              // 当需要计算的数字小于6时,直接计算结果             if (to - from < 6) {                 long total = 0;                 for (int i = from; i <= to; i++) {                     total += numbers[i];                 }                 return total;                 // 否则,把任务一分为二,递归计算             } else {                 int middle = (from + to) / 2;                 SumTask taskLeft = new SumTask(numbers, from, middle);                 SumTask taskRight = new SumTask(numbers, middle + 1, to);                 taskLeft.fork();                 taskRight.fork();                 return taskLeft.join() + taskRight.join();             }         }     } }  interface Calculator {     long sumUp(long[] numbers); } 

此外,Java 的流式 API 底层也是 ForkJoinPool 实现的。

更多内容参考如下两篇文章

ForkJoinPool 的使用以及原理

聊聊并发(八)——Fork/Join 框架介绍

WorkStealingPool

每个线程都有单独的队列,每个线程队列执行完毕后,就会去其他的线程队列里面拿过来执行, 底层是ForkJoinPool

  • Java SE 1.8 新增

  • 会自动启动 CPU 核数个线程去执行任务

使用示例:

/**  *  */ package git.snippets.juc;  import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;  /**  * @since 1.8  */ public class WorkStealingPoolUsage {     public static void main(String[] args) throws IOException {         int core = Runtime.getRuntime().availableProcessors();         //  会自动启动cpu核数个线程去执行任务 ,其中第一个是1s执行完毕,其余都是2s执行完毕,         //  有一个任务会进行等待,当第一个执行完毕后,会再次偷取最后一个任务执行         ExecutorService service = Executors.newWorkStealingPool();         service.execute(new R(1000));         for (int i = 0; i < core; i++) {             service.execute(new R(2000));         }         //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出         System.in.read();     }      static class R implements Runnable {          int time;          R(int t) {             this.time = t;         }          @Override         public void run() {             try {                 TimeUnit.MILLISECONDS.sleep(time);             } catch (InterruptedException e) {                 e.printStackTrace();             }              System.out.println(time + " " + Thread.currentThread().getName());         }     } }  

CompletableFuture

  • Java SE 1.8 新增

  • anyOf()可以实现“任意个 CompletableFuture 只要一个成功”,allOf()可以实现“所有 CompletableFuture 都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。

使用示例:

package git.snippets.juc;  import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit;  /**  * 假设你能够提供一个服务  * 这个服务查询各大电商网站同一类产品的价格并汇总展示  */ public class CompletableFutureUsage {     public static void main(String[] args) throws ExecutionException, InterruptedException {         way1();         way2();     }      public static void way1() {         long start = System.currentTimeMillis();         System.out.println("p1 " + priceOfJD());         System.out.println("p2 " + priceOfTB());         System.out.println("p3 " + priceOfTM());         long end = System.currentTimeMillis();         System.out.println("串行执行,耗时(ms):" + (end - start));     }      public static void way2() throws ExecutionException, InterruptedException {         long start = System.currentTimeMillis();         CompletableFuture<Double> p1 = CompletableFuture.supplyAsync(() -> priceOfJD());         CompletableFuture<Double> p2 = CompletableFuture.supplyAsync(() -> priceOfTB());         CompletableFuture<Double> p3 = CompletableFuture.supplyAsync(() -> priceOfTM());         CompletableFuture.allOf(p1, p2, p3).join();         System.out.println("p1 " + p1.get());         System.out.println("p2 " + p2.get());         System.out.println("p3 " + p3.get());         long end = System.currentTimeMillis();         System.out.println("使用CompletableFuture并行执行,耗时(ms): " + (end - start));     }      private static double priceOfTM() {         delay();         return 1.00;     }      private static double priceOfTB() {         delay();         return 2.00;     }      private static double priceOfJD() {         delay();         return 3.00;     }      private static void delay() {         int time = new Random().nextInt(500);         try {             TimeUnit.MILLISECONDS.sleep(time);         } catch (InterruptedException e) {             e.printStackTrace();         }     } }  

证明原子操作类比synchronized更高效

示例代码如下

package git.snippets.juc;  import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger;  /**  * 证明原子操作类比synchronized更高效  *  * @author <a href="mailto:410486047@qq.com">Grey</a>  * @date 2021/4/26  */ public class AtomVSSync {     public static void main(String[] args) {         test1();     }      AtomicInteger atomicCount = new AtomicInteger(0);     int count = 0;     final static int TIMES = 80000000;      void m() {         for (int i = 0; i < TIMES; i++) {             atomicCount.incrementAndGet(); //原子操作         }     }      void m2() {         for (int i = 0; i < TIMES; i++) {             synchronized (this) {                 count++;             }         }     }       public static void test1() {         AtomVSSync t1 = new AtomVSSync();         AtomVSSync t2 = new AtomVSSync();         long time1 = time(t1::m);         System.out.println("使用原子类得到的结果是:" + t1.atomicCount);         long time2 = time(t2::m2);         System.out.println("使用synchronized得到的结果是:" + t2.count);          System.out.println("使用原子类花费的时间是:" + time1);         System.out.println("使用 synchronized 花费的时间是 :" + time2);     }      private static long time(Runnable runnable) {         List<Thread> threads = new ArrayList<>();         long startTime = System.currentTimeMillis();         for (int i = 0; i < 10; i++) {             threads.add(new Thread(runnable, "thread-" + i));         }         threads.forEach(Thread::start);         threads.forEach(o -> {             try {                 o.join();             } catch (InterruptedException e) {                 e.printStackTrace();             }         });         long endTime = System.currentTimeMillis();         return endTime - startTime;     } }  

Java SE 11 下运行上述代码,代码输出结果如下:

使用原子类得到的结果是:800000000 使用synchronized得到的结果是:800000000 使用原子类花费的时间是:12111 使用 synchronized 花费的时间是 :16471 

AtomXXX类可以保证可见性吗?

可以。

代码如下

package git.snippets.juc;  import java.util.concurrent.atomic.AtomicBoolean;  /**  * AtomXXX类可以保证可见性吗?请写一个程序来证明  *  * @author <a href="mailto:410486047@qq.com">Grey</a>  * @date 2021/4/26  */ public class AtomVisible {     public static void main(String[] args) {         test2();     }      AtomicBoolean running = new AtomicBoolean(true);      void m3() {         System.out.println("m1 start");         while (running.get()) {  //死循环。只有running=false时,才能执行后面的语句          }         System.out.println("m2 end");     }      public static void test2() {         AtomVisible t = new AtomVisible();         new Thread(t::m3, "t1").start();         try {             Thread.sleep(1000);         } catch (InterruptedException e) {             e.printStackTrace();         }         t.running.getAndSet(false);      } } 

输出结果

m1 start m2 end 

写一个程序证明AtomXXX类的多个方法并不构成原子性

package git.snippets.juc;  import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger;  /**  * 写一个程序证明AtomXXX类的多个方法并不构成原子性  *  * @author <a href="mailto:410486047@qq.com">Grey</a>  * @date 2021/4/26  */ public class MultiAtomMethod {     public static void main(String[] args) {         test3();     }      AtomicInteger count = new AtomicInteger(0);      void m4() {         for (int i = 0; i < 10000; i++) {             if (count.get() < 999 && count.get() >= 0) { //如果未加锁,之间还会有其他线程插进来                 count.incrementAndGet();             }         }     }      public static void test3() {         MultiAtomMethod t = new MultiAtomMethod();         List<Thread> threads = new ArrayList<>();         for (int i = 0; i < 100; i++) {             threads.add(new Thread(t::m4, "thread" + i));         }         threads.forEach(Thread::start);         threads.forEach((o) -> {             try {                 //join()方法阻塞调用此方法的线程,直到线程t完成,此线程再继续。通常用于在main()主线程内,等待其它线程完成再结束main()主线程。                 o.join(); //相当于在main线程中同步o线程,o执行完了,main线程才有执行的机会             } catch (InterruptedException e) {                 e.printStackTrace();             }         });         System.out.println(t.count);     } } 

说明

本文涉及到的所有代码和图例

图例

代码

更多内容见:Java 多线程

参考资料

工作线程数究竟要设置为多少 | 架构师之路

实战Java高并发程序设计(第2版)

深入浅出Java多线程

多线程与高并发-马士兵

Java并发编程实战

理解ScheduledExecutorService中scheduleAtFixedRate和scheduleWithFixedDelay的区别

ForkJoinPool 的使用以及原理

聊聊并发(八)——Fork/Join 框架介绍

使用CompletableFuture

图解Java多线程设计模式

发表评论

评论已关闭。

相关文章