无论是项目开发还是开源代码阅读,多线程都是不可或缺的一个重要知识点,基于这个考量,于是总结出本篇文章,讨论闭锁(CountDownLatch)、栅栏(CyclicBarrier)与异步编排(CompletableFuture)
@Author:Akai-yuan
@更新时间:2023/2/4
1.CountDownLatch
1.适用场景
- 协调子线程结束动作:等待所有子线程运行结束
主线程创建了5个子线程,各子任务执行确认动作,期间主线程进入等待状态,直到各子线程的任务均已经完成,主线程恢复继续执行。
- 协调子线程开始动作:统一各线程动作开始的时机
从多线程的角度看,这恰似你创建了一些多线程,但是你需要统一管理它们的任务开始时间。
2.设计思想
CountDownLatch基于一个同步器实现,并且只有CountDownLatch(int count)一个构造器,指定数量count不得在中途修改它。
核心函数
- await():等待latch降为0;
- boolean await(long timeout, TimeUnit unit):等待latch降为0,但是可以设置超时时间。
- countDown():latch数量减1;
- getCount():获取当前的latch数量。
3.场景实例
场景1. 对各子线程的等待
public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(4); Thread t1 = new Thread(countDownLatch::countDown); Thread t2 = new Thread(countDownLatch::countDown); Thread t3 = new Thread(countDownLatch::countDown); Thread t4 = new Thread(() -> { try { // 稍等... Thread.sleep(1500); countDownLatch.countDown(); } catch (InterruptedException ignored) {} }); t1.start(); t2.start(); t3.start(); t4.start(); //直到所有线程都对计数器进行减一后,这里才放行 countDownLatch.await(); System.out.println("所有子线程就位,可以继续执行其他任务"); }
场景2. 对多线程的统一管理
我们仍然用4个线程调用了start(),但是它们在运行时都在等待countDownLatch的信号,在信号未收到前,它们不会往下执行。
public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); Thread t1 = new Thread(() -> waitForCountDown(countDownLatch)); Thread t2 = new Thread(() -> waitForCountDown(countDownLatch)); Thread t3 = new Thread(() -> waitForCountDown(countDownLatch)); Thread t4 = new Thread(() -> waitForCountDown(countDownLatch)); t1.start(); t2.start(); t3.start(); t4.start(); Thread.sleep(1000); countDownLatch.countDown(); System.out.println("所有线程准备完成"); } private static void waitForCountDown(CountDownLatch countDownLatch) { try { // 等待信号 countDownLatch.await(); System.out.println("本线程等待完毕"); } catch (InterruptedException e) { e.printStackTrace(); } }
输出:
所有线程准备完成 本线程等待完毕 本线程等待完毕 本线程等待完毕 本线程等待完毕 Process finished with exit code 0
场景3. SOFAJRaft的实践
// 定义一个CountDownLatch计数器 private final CountDownLatch startTimeInitialized = new CountDownLatch(1); public void start() { switch (workerStateUpdater.get(this)) { case WORKER_STATE_INIT: if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { //此处调用工作线程执行CountDownLatch的countDown()方法 //即startTimeInitialized.countDown(); workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // 等待startTime被工作线程初始化完成 while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
2.CyclicBarrier
1.适用场景
栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。
CyclicBarrier与CountDownLatch的区别
| CyclicBarrier | CountDownLatch |
|---|---|
| CyclicBarrier是可重用的,其中的线程会等待所有的线程完成任务。届时,屏障将被拆除,并可以选择性地做一些特定的动作。 | CountDownLatch是一次性的,不同的线程在同一个计数器上工作,直到计数器为0 |
| CyclicBarrier面向的是线程数 | CountDownLatch面向的是任务数 |
| 在使用CyclicBarrier时,你必须在构造中指定参与协作的线程数,这些线程必须调用await()方法 | 使用CountDownLatch时,则必须要指定任务数,至于这些任务由哪些线程完成无关紧要 |
| CyclicBarrier可以在所有的线程释放后重新使用 | CountDownLatch在计数器为0时不能再使用 |
| 在CyclicBarrier中,如果某个线程遇到了中断、超时等问题时,则处于await的线程都会出现问题 | 在CountDownLatch中,如果某个线程出现问题,其他线程不受影响 |
2.设计思想
1.构造器
// 指定参与方的数量; public CyclicBarrier(int parties); // 指定参与方的数量,并指定在本代次结束时运行的代码 public CyclicBarrier(int parties, Runnable barrierAction):
2.核心方法
//如果当前线程不是第一个到达屏障的话,它将会进入等待,直到其他线程都到达 //除非发生被中断、屏障被拆除、屏障被重设等情况 public int await(); //和await()类似,但是加上了时间限制; public int await(long timeout, TimeUnit unit); //当前屏障是否被拆除; public boolean isBroken(); //重设当前屏障。会先拆除屏障再设置新的屏障 public void reset(); //正在等待的线程数量 public int getNumberWaiting();
3.场景实例
下面以一个简单的日常对话来讲解CyclicBarrier的使用实例
private static String appointmentPlace = "书房"; public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("yuan所在的地点:" + appointmentPlace)); // 线程Akai Thread Akai = newThread("Akai", () -> { System.out.println("yuan,饭好了快来吃饭..."); try { // 此时Akai在屏障前等待 cyclicBarrier.await(); System.out.println("yuan,你来了..."); // 开始吃饭... Thread.sleep(2600); System.out.println("好的,你去洗你的碗吧!"); // 第二次调用await cyclicBarrier.await(); Thread.sleep(100); System.out.println("好吧,你这个懒猪!"); } catch (Exception e) { e.printStackTrace(); } }); // 线程yuan Thread yuan = newThread("yuan", () -> { try { // yuan在敲代码 Thread.sleep(500); System.out.println("我在敲代码,我马上就来!"); // yuan到达饭桌前 cyclicBarrier.await(); Thread.sleep(500); System.out.println("Akai,不好意思,刚刚沉迷于敲代码了!"); // 开始吃饭... Thread.sleep(1500); // yuan想先吃完赶快洗碗然后溜出去敲代码 System.out.println("我吃完了,我要去洗碗了"); // yuan把地点改成了厨房 appointmentPlace = "厨房"; // 洗碗中... Thread.sleep(1500); System.out.println("︎yuan终于洗完自己的碗了"); // 第二次调用await cyclicBarrier.await(); System.out.println("Akai你吃完了,你的碗自己去洗吧,我已经在敲代码了"); } catch (Exception ignored) {} }); Akai.start(); yuan.start(); }
输出结果:
yuan,饭好了快来吃饭... 我在敲代码,我马上就来! yuan所在的地点:书房 yuan,你来了... Akai,不好意思,刚刚沉迷于敲代码了! 我吃完了,我要去洗碗了 好的,你去洗你的碗吧! yuan终于洗完自己的碗了 yuan所在的地点:厨房 Akai你吃完了,你的碗自己去洗吧,我已经在敲代码了 好吧,你这个懒猪!
3.CompletableFuture
1.设计思想
1.Future的局限性
- 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
- 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
- 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
- 没有异常处理:Future接口中没有关于异常处理的方法;
2.Completable有哪些优势
CompletableFuture是Future接口的扩展和增强。
CompletableFuture完整地继承了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是,CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。
从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
2.核心设计
我们首先来讨论CompletableFuture的核心:CompletionStage
顾名思义,根据CompletionStage名字中的"Stage",你可以把它理解为任务编排中的步骤。步骤,即任务编排的基本单元,它可以是一次纯粹的计算或者是一个特定的动作。在一次编排中,会包含多个步骤,这些步骤之间会存在依赖、链式和组合等不同的关系,也存在并行和串行的关系。这种关系,类似于Pipeline或者流式计算。
既然是编排,就需要维护任务的创建、建立计算关系。为此,CompletableFuture提供了多达50多个方法,但没有必要全部完全理解,但我们可以通过分类的方式简化对方法的理解,理解了类型和变种,基本上我们也就掌握了CompletableFuture的核心能力。
这些方法可以总结为以下四类,其他大部分方法都是基于这四种类型的变种:

3.核心用法
1.runAsync
- runAsync()是CompletableFuture最常用的方法之一,它可以接收一个待运行的任务并返回一个CompletableFuture
- 当我们想异步运行某个任务时,在以往需要手动实现Thread或者借助Executor实现。而通过runAsync()`就简单多了。比如,我们可以直接传入Runnable类型的任务:
CompletableFuture.runAsync(new Runnable() { @Override public void run() { System.out.println("something"); } });
2.supply与supplyAsync
- 所谓supply表示提供结果,换句话说当我们使用supply()时,就表明我们会返回一个结果,并且这个结果可以被后续的任务所使用。
// 创建nameFuture,返回姓名 CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> { return "Akai-yuan"; }); // 使用thenApply()接收nameFuture的结果,并执行回调动作 CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> { return "love you," + name; }); //阻塞获得表白的结果 System.out.println(sayLoveFuture.get()); // love you,Akai-yuan
一旦理解了supply()的含义,它也就如此简单。如果你希望用新的线程运行任务,可以使用supplyAsync().
3.thenApply与thenApplyAsync
- 我们已经知道supply()是用于提供结果的,并且顺带提了thenApply()。很明显,thenApply()是supply()的搭档,用于接收supply()的执行结果,并执行特定的代码逻辑,最后返回CompletableFuture结果。
// 使用thenApply()接收nameFuture的结果,并执行回调动作 CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> { return "爱你," + name; }); public <U> CompletableFuture <U> thenApplyAsync( Function <? super T, ? extends U> fn) { return uniApplyStage(null, fn); }
4.thenAccept与thenAcceptAsync
作为supply()的档案,thenApply()并不是唯一的存在,thenAccept()也是。但与thenApply()不同,thenAccept()只接收数据,但不会返回,它的返回类型是Void.
CompletableFuture<Void> sayLoveFuture = nameFuture.thenAccept(name -> { System.out.println("爱你," + name); }); public CompletableFuture < Void > thenAccept(Consumer < ? super T > action) { return uniAcceptStage(null, action); }
5.thenRun
thenRun()就比较简单了,不接收任务的结果,只运行特定的任务,并且也不返回结果。
public CompletableFuture < Void > thenRun(Runnable action) { return uniRunStage(null, action); }
所以,如果你在回调中不想返回任何的结果,只运行特定的逻辑,那么你可以考虑使用thenAccept和thenRun一般来说,这两个方法会在调用链的最后面使用。
6.thenCompose与 thenCombine
以上几种方法都是各玩各的,但thenCompose()与thenCombine()就不同了,它们可以实现对依赖和非依赖两种类型的任务的编排。
编排两个存在依赖关系的任务
在前面的例子中,在接收前面任务的结果时,我们使用的是thenApply(). 也就是说,sayLoveFuture在执行时必须依赖nameFuture的完成,否则执行个锤子。
// 创建Future CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> { return "Akai-yuan"; }); // 使用thenApply()接收nameFuture的结果,并执行回调动作 CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> { return "爱你," + name; });
但其实,除了thenApply()之外,我们还可以使用thenCompose()来编排两个存在依赖关系的任务。比如,上面的示例代码可以写成:
// 创建Future CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> { return "Akai-yuan"; }); CompletableFuture<String> sayLoveFuture2 = nameFuture.thenCompose(name -> { return CompletableFuture.supplyAsync(() -> "爱你," + name); });
可以看到,thenCompose()和thenApply()的核心不同之处在于它们的返回值类型:
- thenApply():返回计算结果的原始类型,比如返回String;
- thenCompose():返回CompletableFuture类型,比如返回CompletableFuture.
组合两个相互独立的任务
考虑一个场景,当我们在执行某个任务时,需要其他任务就绪才可以,应该怎么做?这样的场景并不少见,我们可以使用前面学过的并发工具类实现,也可以使用thenCombine()实现。
举个例子,当我们计算某个胜率时,我们需要获取她参与的总场次(rounds),以及获胜的场次(winRounds),然后再通过winRounds / rounds来计算。对于这个计算,我们可以这么做:
CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> 500); CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> 365); CompletableFuture < Object > winRateFuture = roundsFuture .thenCombine(winRoundsFuture, (rounds, winRounds) -> { if (rounds == 0) { return 0.0; } DecimalFormat df = new DecimalFormat("0.00"); return df.format((float) winRounds / rounds); }); System.out.println(winRateFuture.get());
thenCombine()将另外两个任务的结果同时作为参数,参与到自己的计算逻辑中。在另外两个参数未就绪时,它将会处于等待状态。
7.allOf与anyOf
allOf()与anyOf()也是一对孪生兄弟,当我们需要对多个Future的运行进行组织时,就可以考虑使用它们:
- allOf():给定一组任务,等待所有任务执行结束;
- anyOf():给定一组任务,等待其中任一任务执行结束。
allOf()与anyOf()的方法签名如下:
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
需要注意的是,anyOf()将返回完任务的执行结果,但是allOf()不会返回任何结果,它的返回值是Void.
allOf()与anyOf()的示例代码如下所示。我们创建了roundsFuture和winRoundsFuture,并通过sleep模拟它们的执行时间。在执行时,winRoundsFuture将会先返回结果,所以当我们调用 CompletableFuture.anyOf时也会发现输出的是365.
CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(200); return 500; } catch (InterruptedException e) { return null; } }); CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(100); return 365; } catch (InterruptedException e) { return null; } }); CompletableFuture < Object > completedFuture = CompletableFuture.anyOf(winRoundsFuture, roundsFuture); System.out.println(completedFuture.get()); // 返回365 CompletableFuture < Void > completedFutures = CompletableFuture.allOf(winRoundsFuture, roundsFuture);
在CompletableFuture之前,如果要实现所有任务结束后执行特定的动作,我们可以考虑CountDownLatch等工具类。现在,则多了一选项,我们也可以考虑使用CompletableFuture.allOf.
8.异常处理
在CompletableFuture链式调用中,如果某个任务发生了异常,那么后续的任务将都不会再执行。对于异常,我们有两种处理方式:exceptionally()和handle().
1.使用exceptionally()回调处理异常
在链式调用的尾部使用exceptionally(),捕获异常并返回错误情况下的默认值。需要注意的是,exceptionally()仅在发生异常时才会调用。
CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture .thenCombine(winRoundsFuture, (rounds, winRounds) -> { if (rounds == 0) { throw new RuntimeException("总场次错误"); } DecimalFormat df = new DecimalFormat("0.00"); return df.format((float) winRounds / rounds); }).exceptionally(ex -> { System.out.println("出错:" + ex.getMessage()); return ""; }); System.out.println(winRateFuture.get());
2. 使用handle()处理异常
除了exceptionally(),CompletableFuture也提供了handle()来处理异常。不过,与exceptionally()不同的是,当我们在调用链中使用了handle(),那么无论是否发生异常,都会调用它。所以,在handle()方法的内部,我们需要通过 if (ex != null) 来判断是否发生了异常。
CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture .thenCombine(winRoundsFuture, (rounds, winRounds) -> { if (rounds == 0) { throw new RuntimeException("总场次错误"); } DecimalFormat df = new DecimalFormat("0.00"); return df.format((float) winRounds / rounds); }).handle((res, ex) -> { if (ex != null) { System.out.println("出错:" + ex.getMessage()); return ""; } return res; }); System.out.println(winRateFuture.get());
当然,如果我们允许某个任务发生异常而不中断整个调用链路,那么可以在其内部通过try-catch消化掉。