1. 是什么
不能重复使用的计数器。让一个线程等待其他线程完事再往下执行,类似于Thread.join()
底层使用AQS实现
2. 如何使用
public class CountDownLatchTest { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) { int finalI = i; new Thread(() -> { try { if (finalI == 5) { TimeUnit.SECONDS.sleep(10L); } System.out.println(String.format("线程%s,时间【%s】 countdown", Thread.currentThread().getName(),LocalDateTime.now())); latch.countDown(); System.out.println(String.format("线程%s,时间【%s】 执行完毕", Thread.currentThread().getName(),LocalDateTime.now())); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } latch.await(); System.out.println(Thread.currentThread().getName() + "开始执行"); } }
- 注意
这里countdown的线程不会互相等待,谁先执行完谁就先退出
2.1. CountDownLatch VS CyclicBarrier
| CountDownLatch | CyclicBarrier | |
|---|---|---|
| 使用场景 | 一个线程等待其他线程执行完毕,再往下执行 | 所有线程相互等待直到最后一个线程到达,再往下执行 |
| 能否重复使用 | 不可以 | 可以 |
| 底层实现 | AQS | Lock+Condition |
3. uml

4. 构造方法
public class CountDownLatch { //继承了AQS private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); //默认就设置了count个信号量(即相当于一开始就加锁了count次) this.sync = new Sync(count); } }
4.1. Sync【AQS子类】
private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); } int getCount() { return getState(); } //重写的是AQS共享获取锁的方法 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //重写的是AQS共享释放锁的方法 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
5. countDown方法
public void countDown() { //调用了AQS的releaseShared方法 sync.releaseShared(1); }
5.1. 使用AQS释放锁
- AQS releaseShared
public final boolean releaseShared(int arg) { //调用Sync重写的tryReleaseShared释放信号量 if (tryReleaseShared(arg)) { //释放锁成功后调用Sync的doReleaseShared方法 doReleaseShared(); return true; } return false; }
- 3行:调用AQS的tryReleaseShared方法释放锁,由于Sync重写了这个方法,所以调用的是Sync重写的tryReleaseShared释放锁。当锁的数量减为0返回ture,表明所有线程都准备就绪
- 5行:使用tryReleaseShared释放锁成功后调用Sync的doReleaseShared方法。移除AQS队列中SIGNAL的节点并一个个唤醒
下面具体说明:
5.1.1. 尝试释放锁
- Sync.tryReleaseShared
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero //不断尝试 for (;;) { //信号量为0,表明还没有人加锁,自然没法解锁,返回失败 int c = getState(); if (c == 0) return false; //CAS设置信号量-1。 int nextc = c-1; if (compareAndSetState(c, nextc)) //看是否为0,是则返回成功 return nextc == 0; } }
5.1.2. 所有锁释放成功后,移除AQS队列中SIGNAL的节点,并一个个唤醒
- doReleaseShared
private void doReleaseShared() { //不断尝试 for (;;) { Node h = head; //AQS队列不为空,把队列中SIGNAL的节点移除 if (h != null && h != tail) { int ws = h.waitStatus; //头节点状态为SIGNAL if (ws == Node.SIGNAL) { //在头节点状态为signal的情况设置为0,失败了继续直到成功 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //把头节点从AQS队列中移除 unparkSuccessor(h); } //头节点状态为0,那么设置为PROPAGATE,失败了继续直到成功 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //队列中没有SIGNAL的节点 if (h == head) // loop if head changed break; } }
5.1.2.1. 把头节点从AQS队列中移除
- unparkSuccessor
private void unparkSuccessor(Node node) { int ws = node.waitStatus; //当前节点的状态<0,则把状态改为0 //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //当前节点的下一个节点为空或者状态>0(即是取消状态) Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; //那么从队尾开始往前遍历找到离当前节点最近的下一个状态<=0的节点(即非取消状态) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //唤醒下一个节点 if (s != null) LockSupport.unpark(s.thread); }
6. await方法
public void await() throws InterruptedException { //调用AQS的acquireSharedInterruptibly方法加锁 sync.acquireSharedInterruptibly(1); }
6.1. 使用AQS加锁
- AQS acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //调用Sync重写的tryAcquireShared判断是否加锁。 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
- 6行:调用Sync重写的tryAcquireShared判断是否需要加锁,而不是真的加锁。可以看出当tryAcquireShared返回<0的时候需要往下执行doAcquireSharedInterruptibly进行加锁。
而tryAcquireShared返回<0指的是一开始设置的count个信号量没有被用完,说明其他线程任务没执行完 - 7行:加锁。准确说是加入AQS队列,阻塞等待其他线程执行完
下面详细说明:
6.1.1. 判断是否需要加锁
- Sync tryAcquireShared
protected int tryAcquireShared(int acquires) { //当前锁的数量为0,即所有线程任务都执行完了,那么返回1不用加锁 //否则>0指的是一开始设置的count个信号量没有被用完,说明其他线程任务没执行完。那么该线程需要进行加锁,故返回-1 return (getState() == 0) ? 1 : -1; }
6.1.2. 需要加锁,那么加入AQS队列阻塞等待其他线程执行完
当state>0说明有信号量没被释放完,那么需要加锁
- doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //以SHARE模式加入AQS队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //死循环直到获取锁成功 for (;;) { //逻辑1. //当前节点的前一个节点是头节点的时候(公平锁:即我的前面没有人等待获取锁),尝试获取锁 final Node p = node.predecessor(); if (p == head) { //state == 0(即没人加锁的情况下才执行加锁--其实并没有真的加锁) int r = tryAcquireShared(arg); if (r >= 0) { //获取锁成功后设置头节点为当前节点 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //逻辑2. //当前节点的前一个节点状态时SIGNAL(承诺唤醒当前节点)的时候,阻塞当前线程。 //什么时候唤醒?释放锁的时候 //唤醒之后干什么?继续死循环执行上面的逻辑1 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } //如果发生了异常,那么执行下面的逻辑 } finally { //除了获取锁成功的情况都会执行cancelAcquire方法 if (failed) cancelAcquire(node); } }
6.1.2.1. 构造节点加入AQS队列
- AQS.addWaiter
private Node addWaiter(Node mode) { //用当前线程、SHARED模式构造节点 Node node = new Node(Thread.currentThread(), mode); // 队列不为空 Node pred = tail; if (pred != null) { //插入到队尾 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //队列为空或者插入到队尾失败 enq(node); return node; }
队列为空或者插入到队尾失败的情况执行enq,如下
- AQS.enq
private Node enq(final Node node) { //死循环直到入队成功 for (;;) { Node t = tail; //队列为空,那么初始化头节点。注意是new Node而不是当前node(即队头是个占位符) if (t == null) { if (compareAndSetHead(new Node())) tail = head; //队列不为空,插入到队尾 } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
6.1.2.2. 判断是否需要阻塞
- shouldParkAfterFailedAcquire
//根据(前一个节点,当前节点)->是否阻塞当前线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //前一个节点的状态时SIGNAL,即释放锁后承诺唤醒当前节点,那么返回true可以阻塞当前线程 if (ws == Node.SIGNAL) return true; //前一个节点状态>0,即CANCEL。 //那么往前遍历找到没有取消的前置节点。同时从链表中移除CANCEL状态的节点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; // 前置节点状态>=0,即0或者propagate。 //这里通过CAS把前置节点状态改成signal成功获取锁,失败的话再阻塞。why? } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
6.1.2.3. 真正阻塞
- parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() { //使用Unsafe阻塞当前线程,这里会清除线程中断的标记,因此需要返回中断的标记 LockSupport.park(this); return Thread.interrupted(); }
6.1.3. 不需要加锁
当state=0说明所有信号量已被释放完,那么直接返回,执行业务逻辑
7. 总结
- 让一个线程等待其他线程完事再往下执行,类似于Thread.join()
- 主线程创建CountDownLatch的时候初始化了信号量,相当于一开始就有N个人加锁。
- 主线程调用await的时候检查信号量是否为0,不为0说明其他线程没有执行完,那么加入AQS队列阻塞,等待唤醒
- 其他线程调用countDown的时候会使信号量-1,最后一个线程减为0的时候会唤醒AQS队列中的所有节点(主线程),让其继续往下执行
- 主线程被唤醒继续往下执行