探秘多线程-闭锁、栅栏与异步编排

无论是项目开发还是开源代码阅读,多线程都是不可或缺的一个重要知识点,基于这个考量,于是总结出本篇文章,讨论闭锁(CountDownLatch)、栅栏(CyclicBarrier)与异步编排(CompletableFuture)
@Author:Akai-yuan
@更新时间:2023/2/4

1.CountDownLatch

1.适用场景

  1. 协调子线程结束动作:等待所有子线程运行结束

主线程创建了5个子线程,各子任务执行确认动作,期间主线程进入等待状态,直到各子线程的任务均已经完成,主线程恢复继续执行。

  1. 协调子线程开始动作:统一各线程动作开始的时机

从多线程的角度看,这恰似你创建了一些多线程,但是你需要统一管理它们的任务开始时间。

2.设计思想

CountDownLatch基于一个同步器实现,并且只有CountDownLatch(int count)一个构造器,指定数量count不得在中途修改它。
核心函数

  • await():等待latch降为0;
  • boolean await(long timeout, TimeUnit unit):等待latch降为0,但是可以设置超时时间。
  • countDown():latch数量减1;
  • getCount():获取当前的latch数量。

3.场景实例

场景1. 对各子线程的等待

public static void main(String[] args) throws InterruptedException {     CountDownLatch countDownLatch = new CountDownLatch(4);      Thread t1 = new Thread(countDownLatch::countDown);     Thread t2 = new Thread(countDownLatch::countDown);     Thread t3 = new Thread(countDownLatch::countDown);     Thread t4 = new Thread(() -> {         try {             // 稍等...             Thread.sleep(1500);             countDownLatch.countDown();         } catch (InterruptedException ignored) {}     });      t1.start();     t2.start();     t3.start();     t4.start(); 		//直到所有线程都对计数器进行减一后,这里才放行     countDownLatch.await();     System.out.println("所有子线程就位,可以继续执行其他任务"); } 

场景2. 对多线程的统一管理

我们仍然用4个线程调用了start(),但是它们在运行时都在等待countDownLatch的信号,在信号未收到前,它们不会往下执行。

public static void main(String[] args) throws InterruptedException {     CountDownLatch countDownLatch = new CountDownLatch(1);      Thread t1 = new Thread(() -> waitForCountDown(countDownLatch));     Thread t2 = new Thread(() -> waitForCountDown(countDownLatch));     Thread t3 = new Thread(() -> waitForCountDown(countDownLatch));     Thread t4 = new Thread(() -> waitForCountDown(countDownLatch));      t1.start();     t2.start();     t3.start();     t4.start();     Thread.sleep(1000);     countDownLatch.countDown();     System.out.println("所有线程准备完成"); }  private static void waitForCountDown(CountDownLatch countDownLatch) {     try { 			// 等待信号         countDownLatch.await();          System.out.println("本线程等待完毕");     } catch (InterruptedException e) {         e.printStackTrace();     } } 

输出:

所有线程准备完成 本线程等待完毕 本线程等待完毕 本线程等待完毕 本线程等待完毕  Process finished with exit code 0 

场景3. SOFAJRaft的实践

// 定义一个CountDownLatch计数器 private final CountDownLatch startTimeInitialized = new CountDownLatch(1);  public void start() {     switch (workerStateUpdater.get(this)) {         case WORKER_STATE_INIT:             if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {                 //此处调用工作线程执行CountDownLatch的countDown()方法                 //即startTimeInitialized.countDown();                 workerThread.start();             }             break;         case WORKER_STATE_STARTED:             break;         case WORKER_STATE_SHUTDOWN:             throw new IllegalStateException("cannot be started once stopped");         default:             throw new Error("Invalid WorkerState");     }      // 等待startTime被工作线程初始化完成     while (startTime == 0) {         try {             startTimeInitialized.await();         } catch (InterruptedException ignore) {             // Ignore - it will be ready very soon.         }     } } 

2.CyclicBarrier

1.适用场景

栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier与CountDownLatch的区别

CyclicBarrier CountDownLatch
CyclicBarrier是可重用的,其中的线程会等待所有的线程完成任务。届时,屏障将被拆除,并可以选择性地做一些特定的动作。 CountDownLatch是一次性的,不同的线程在同一个计数器上工作,直到计数器为0
CyclicBarrier面向的是线程数 CountDownLatch面向的是任务数
在使用CyclicBarrier时,你必须在构造中指定参与协作的线程数,这些线程必须调用await()方法 使用CountDownLatch时,则必须要指定任务数,至于这些任务由哪些线程完成无关紧要
CyclicBarrier可以在所有的线程释放后重新使用 CountDownLatch在计数器为0时不能再使用
在CyclicBarrier中,如果某个线程遇到了中断、超时等问题时,则处于await的线程都会出现问题 在CountDownLatch中,如果某个线程出现问题,其他线程不受影响

2.设计思想

1.构造器

// 指定参与方的数量; public CyclicBarrier(int parties); // 指定参与方的数量,并指定在本代次结束时运行的代码 public CyclicBarrier(int parties, Runnable barrierAction): 

2.核心方法

//如果当前线程不是第一个到达屏障的话,它将会进入等待,直到其他线程都到达 //除非发生被中断、屏障被拆除、屏障被重设等情况 public int await(); //和await()类似,但是加上了时间限制; public int await(long timeout, TimeUnit unit); //当前屏障是否被拆除; public boolean isBroken(); //重设当前屏障。会先拆除屏障再设置新的屏障 public void reset(); //正在等待的线程数量 public int getNumberWaiting(); 

3.场景实例

下面以一个简单的日常对话来讲解CyclicBarrier的使用实例

private static String appointmentPlace = "书房";  public static void main(String[] args) {   CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("yuan所在的地点:" + appointmentPlace));     // 线程Akai     Thread Akai = newThread("Akai", () -> {     System.out.println("yuan,饭好了快来吃饭...");     try {       // 此时Akai在屏障前等待       cyclicBarrier.await();       System.out.println("yuan,你来了...");       // 开始吃饭...       Thread.sleep(2600);        System.out.println("好的,你去洗你的碗吧!");       // 第二次调用await       cyclicBarrier.await();        Thread.sleep(100);       System.out.println("好吧,你这个懒猪!");     } catch (Exception e) {       e.printStackTrace();     }   });   // 线程yuan   Thread yuan = newThread("yuan", () -> {     try {       // yuan在敲代码       Thread.sleep(500);        System.out.println("我在敲代码,我马上就来!");       // yuan到达饭桌前       cyclicBarrier.await();        Thread.sleep(500);        System.out.println("Akai,不好意思,刚刚沉迷于敲代码了!");       // 开始吃饭...       Thread.sleep(1500);        // yuan想先吃完赶快洗碗然后溜出去敲代码       System.out.println("我吃完了,我要去洗碗了");        // yuan把地点改成了厨房       appointmentPlace = "厨房";        // 洗碗中...       Thread.sleep(1500);        System.out.println("︎yuan终于洗完自己的碗了");       // 第二次调用await       cyclicBarrier.await();       System.out.println("Akai你吃完了,你的碗自己去洗吧,我已经在敲代码了");     } catch (Exception ignored) {}   });    Akai.start();   yuan.start(); } 

输出结果:

yuan,饭好了快来吃饭... 我在敲代码,我马上就来! yuan所在的地点:书房 yuan,你来了... Akai,不好意思,刚刚沉迷于敲代码了! 我吃完了,我要去洗碗了 好的,你去洗你的碗吧! yuan终于洗完自己的碗了 yuan所在的地点:厨房 Akai你吃完了,你的碗自己去洗吧,我已经在敲代码了 好吧,你这个懒猪!     

3.CompletableFuture

1.设计思想

1.Future的局限性

  • 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
  • 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
  • 没有异常处理:Future接口中没有关于异常处理的方法;

2.Completable有哪些优势

CompletableFuture是Future接口的扩展和增强
CompletableFuture完整地继承了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。
从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。

2.核心设计

我们首先来讨论CompletableFuture的核心:CompletionStage
顾名思义,根据CompletionStage名字中的"Stage",你可以把它理解为任务编排中的步骤。步骤,即任务编排的基本单元,它可以是一次纯粹的计算或者是一个特定的动作。在一次编排中,会包含多个步骤,这些步骤之间会存在依赖链式组合等不同的关系,也存在并行串行的关系。这种关系,类似于Pipeline或者流式计算。
既然是编排,就需要维护任务的创建、建立计算关系。为此,CompletableFuture提供了多达50多个方法,但没有必要全部完全理解,但我们可以通过分类的方式简化对方法的理解,理解了类型变种,基本上我们也就掌握了CompletableFuture的核心能力。
这些方法可以总结为以下四类,其他大部分方法都是基于这四种类型的变种:
探秘多线程-闭锁、栅栏与异步编排

3.核心用法

1.runAsync

  • runAsync()是CompletableFuture最常用的方法之一,它可以接收一个待运行的任务并返回一个CompletableFuture
  • 当我们想异步运行某个任务时,在以往需要手动实现Thread或者借助Executor实现。而通过runAsync()`就简单多了。比如,我们可以直接传入Runnable类型的任务:
CompletableFuture.runAsync(new Runnable() {     @Override     public void run() {         System.out.println("something");     } }); 

2.supply与supplyAsync

  • 所谓supply表示提供结果,换句话说当我们使用supply()时,就表明我们会返回一个结果,并且这个结果可以被后续的任务所使用。
// 创建nameFuture,返回姓名 CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {     return "Akai-yuan"; });   // 使用thenApply()接收nameFuture的结果,并执行回调动作 CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {     return "love you," + name; });  //阻塞获得表白的结果 System.out.println(sayLoveFuture.get()); // love you,Akai-yuan 

一旦理解了supply()的含义,它也就如此简单。如果你希望用新的线程运行任务,可以使用supplyAsync().

3.thenApply与thenApplyAsync

  • 我们已经知道supply()是用于提供结果的,并且顺带提了thenApply()。很明显,thenApply()supply()的搭档,用于接收supply()的执行结果,并执行特定的代码逻辑,最后返回CompletableFuture结果
 // 使用thenApply()接收nameFuture的结果,并执行回调动作 CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {     return "爱你," + name; });  public <U> CompletableFuture <U> thenApplyAsync(     Function <? super T, ? extends U> fn) {     return uniApplyStage(null, fn); } 

4.thenAccept与thenAcceptAsync

作为supply()的档案,thenApply()并不是唯一的存在,thenAccept()也是。但与thenApply()不同,thenAccept()只接收数据,但不会返回,它的返回类型是Void.

CompletableFuture<Void> sayLoveFuture = nameFuture.thenAccept(name -> {      System.out.println("爱你," + name); });          public CompletableFuture < Void > thenAccept(Consumer < ? super T > action) {     return uniAcceptStage(null, action); } 

5.thenRun

thenRun()就比较简单了,不接收任务的结果,只运行特定的任务,并且也不返回结果。

public CompletableFuture < Void > thenRun(Runnable action) {    return uniRunStage(null, action); } 

所以,如果你在回调中不想返回任何的结果,只运行特定的逻辑,那么你可以考虑使用thenAcceptthenRun一般来说,这两个方法会在调用链的最后面使用。

6.thenCompose与 thenCombine

以上几种方法都是各玩各的,但thenCompose()与thenCombine()就不同了,它们可以实现对依赖非依赖两种类型的任务的编排。
编排两个存在依赖关系的任务
在前面的例子中,在接收前面任务的结果时,我们使用的是thenApply(). 也就是说,sayLoveFuture在执行时必须依赖nameFuture的完成,否则执行个锤子。

// 创建Future CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {     return "Akai-yuan"; });   // 使用thenApply()接收nameFuture的结果,并执行回调动作 CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {     return "爱你," + name; }); 

但其实,除了thenApply()之外,我们还可以使用thenCompose()来编排两个存在依赖关系的任务。比如,上面的示例代码可以写成:

// 创建Future CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {     return "Akai-yuan"; });  CompletableFuture<String> sayLoveFuture2 = nameFuture.thenCompose(name -> {     return CompletableFuture.supplyAsync(() -> "爱你," + name); }); 

可以看到,thenCompose()和thenApply()的核心不同之处在于它们的返回值类型

  • thenApply():返回计算结果的原始类型,比如返回String;
  • thenCompose():返回CompletableFuture类型,比如返回CompletableFuture.

组合两个相互独立的任务
考虑一个场景,当我们在执行某个任务时,需要其他任务就绪才可以,应该怎么做?这样的场景并不少见,我们可以使用前面学过的并发工具类实现,也可以使用thenCombine()实现。
举个例子,当我们计算某个胜率时,我们需要获取她参与的总场次(rounds),以及获胜的场次(winRounds),然后再通过winRounds / rounds来计算。对于这个计算,我们可以这么做:

CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> 500); CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);  CompletableFuture < Object > winRateFuture = roundsFuture     .thenCombine(winRoundsFuture, (rounds, winRounds) -> {         if (rounds == 0) {             return 0.0;         }         DecimalFormat df = new DecimalFormat("0.00");         return df.format((float) winRounds / rounds);     }); System.out.println(winRateFuture.get()); 

thenCombine()将另外两个任务的结果同时作为参数,参与到自己的计算逻辑中。在另外两个参数未就绪时,它将会处于等待状态。

7.allOf与anyOf

allOf()与anyOf()也是一对孪生兄弟,当我们需要对多个Future的运行进行组织时,就可以考虑使用它们:

  • allOf():给定一组任务,等待所有任务执行结束;
  • anyOf():给定一组任务,等待其中任一任务执行结束。

allOf()与anyOf()的方法签名如下:

static CompletableFuture<Void>	 allOf(CompletableFuture<?>... cfs) static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 

需要注意的是,anyOf()将返回完任务的执行结果,但是allOf()不会返回任何结果,它的返回值是Void.
allOf()与anyOf()的示例代码如下所示。我们创建了roundsFuture和winRoundsFuture,并通过sleep模拟它们的执行时间。在执行时,winRoundsFuture将会先返回结果,所以当我们调用 CompletableFuture.anyOf时也会发现输出的是365.

 CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {    try {      Thread.sleep(200);      return 500;    } catch (InterruptedException e) {      return null;    }  });  CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> {    try {      Thread.sleep(100);      return 365;    } catch (InterruptedException e) {      return null;    }  });   CompletableFuture < Object > completedFuture = CompletableFuture.anyOf(winRoundsFuture, roundsFuture);  System.out.println(completedFuture.get()); // 返回365   CompletableFuture < Void > completedFutures = CompletableFuture.allOf(winRoundsFuture, roundsFuture); 

在CompletableFuture之前,如果要实现所有任务结束后执行特定的动作,我们可以考虑CountDownLatch等工具类。现在,则多了一选项,我们也可以考虑使用CompletableFuture.allOf.

8.异常处理

在CompletableFuture链式调用中,如果某个任务发生了异常,那么后续的任务将都不会再执行。对于异常,我们有两种处理方式:exceptionally()handle().

1.使用exceptionally()回调处理异常

在链式调用的尾部使用exceptionally(),捕获异常并返回错误情况下的默认值。需要注意的是,exceptionally()仅在发生异常时才会调用

CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture     .thenCombine(winRoundsFuture, (rounds, winRounds) -> {         if (rounds == 0) {             throw new RuntimeException("总场次错误");         }         DecimalFormat df = new DecimalFormat("0.00");         return df.format((float) winRounds / rounds);     }).exceptionally(ex -> {         System.out.println("出错:" + ex.getMessage());         return "";     }); System.out.println(winRateFuture.get()); 
2. 使用handle()处理异常

除了exceptionally(),CompletableFuture也提供了handle()来处理异常。不过,与exceptionally()不同的是,当我们在调用链中使用了handle(),那么无论是否发生异常,都会调用它。所以,在handle()方法的内部,我们需要通过 if (ex != null) 来判断是否发生了异常。

CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture     .thenCombine(winRoundsFuture, (rounds, winRounds) -> {         if (rounds == 0) {             throw new RuntimeException("总场次错误");         }         DecimalFormat df = new DecimalFormat("0.00");         return df.format((float) winRounds / rounds);     }).handle((res, ex) -> {         if (ex != null) {             System.out.println("出错:" + ex.getMessage());             return "";         }         return res;     }); System.out.println(winRateFuture.get()); 

当然,如果我们允许某个任务发生异常而不中断整个调用链路,那么可以在其内部通过try-catch消化掉。

发表评论

评论已关闭。

相关文章