Java源码分析系列笔记-9.CountDownLatch

1. 是什么

不能重复使用的计数器。让一个线程等待其他线程完事再往下执行,类似于Thread.join()
底层使用AQS实现

2. 如何使用

public class CountDownLatchTest {     public static void main(String[] args) throws InterruptedException     {         CountDownLatch latch = new CountDownLatch(10);         for (int i = 0; i < 10; i++)         {             int finalI = i;             new Thread(() -> {                 try                 {                     if (finalI == 5)                     {                         TimeUnit.SECONDS.sleep(10L);                     }                     System.out.println(String.format("线程%s,时间【%s】 countdown", Thread.currentThread().getName(),LocalDateTime.now()));                      latch.countDown();                     System.out.println(String.format("线程%s,时间【%s】 执行完毕", Thread.currentThread().getName(),LocalDateTime.now()));                  }                 catch (InterruptedException e)                 {                     e.printStackTrace();                 }                              }).start();         }                           latch.await();         System.out.println(Thread.currentThread().getName() + "开始执行");     } }  
  • 注意
    这里countdown的线程不会互相等待,谁先执行完谁就先退出

2.1. CountDownLatch VS CyclicBarrier

CountDownLatch CyclicBarrier
使用场景 一个线程等待其他线程执行完毕,再往下执行 所有线程相互等待直到最后一个线程到达,再往下执行
能否重复使用 不可以 可以
底层实现 AQS Lock+Condition

3. uml

Java源码分析系列笔记-9.CountDownLatch

4. 构造方法

public class CountDownLatch {  	//继承了AQS 	private final Sync sync;  	public CountDownLatch(int count) { 	    if (count < 0) throw new IllegalArgumentException("count < 0"); 	    //默认就设置了count个信号量(即相当于一开始就加锁了count次) 	    this.sync = new Sync(count); 	}	  } 

4.1. Sync【AQS子类】

private static final class Sync extends AbstractQueuedSynchronizer {      Sync(int count) {         setState(count);     }      int getCount() {         return getState();     }  	//重写的是AQS共享获取锁的方法     protected int tryAcquireShared(int acquires) {         return (getState() == 0) ? 1 : -1;     }  	//重写的是AQS共享释放锁的方法     protected boolean tryReleaseShared(int releases) {         // Decrement count; signal when transition to zero         for (;;) {             int c = getState();             if (c == 0)                 return false;             int nextc = c-1;             if (compareAndSetState(c, nextc))                 return nextc == 0;         }     } } 

5. countDown方法

public void countDown() { 	//调用了AQS的releaseShared方法     sync.releaseShared(1); } 

5.1. 使用AQS释放锁

  • AQS releaseShared
public final boolean releaseShared(int arg) { 	//调用Sync重写的tryReleaseShared释放信号量     if (tryReleaseShared(arg)) {     	//释放锁成功后调用Sync的doReleaseShared方法         doReleaseShared();         return true;     }     return false; }  
  • 3行:调用AQS的tryReleaseShared方法释放锁,由于Sync重写了这个方法,所以调用的是Sync重写的tryReleaseShared释放锁。当锁的数量减为0返回ture,表明所有线程都准备就绪
  • 5行:使用tryReleaseShared释放锁成功后调用Sync的doReleaseShared方法。移除AQS队列中SIGNAL的节点并一个个唤醒

下面具体说明:

5.1.1. 尝试释放锁

  • Sync.tryReleaseShared
protected boolean tryReleaseShared(int releases) {     // Decrement count; signal when transition to zero     //不断尝试     for (;;) {     	//信号量为0,表明还没有人加锁,自然没法解锁,返回失败         int c = getState();         if (c == 0)             return false;         //CAS设置信号量-1。         int nextc = c-1;         if (compareAndSetState(c, nextc))         	//看是否为0,是则返回成功             return nextc == 0;     } } 

5.1.2. 所有锁释放成功后,移除AQS队列中SIGNAL的节点,并一个个唤醒

  • doReleaseShared
private void doReleaseShared() {     //不断尝试     for (;;) {     	         Node h = head;         //AQS队列不为空,把队列中SIGNAL的节点移除         if (h != null && h != tail) {             int ws = h.waitStatus;             //头节点状态为SIGNAL             if (ws == Node.SIGNAL) {             	//在头节点状态为signal的情况设置为0,失败了继续直到成功                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                     continue;            // loop to recheck cases                 //把头节点从AQS队列中移除                 unparkSuccessor(h);             }             //头节点状态为0,那么设置为PROPAGATE,失败了继续直到成功             else if (ws == 0 &&                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                 continue;                // loop on failed CAS         }          //队列中没有SIGNAL的节点         if (h == head)                   // loop if head changed             break;     } } 
5.1.2.1. 把头节点从AQS队列中移除
  • unparkSuccessor
private void unparkSuccessor(Node node) {         int ws = node.waitStatus;         //当前节点的状态<0,则把状态改为0         //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何         if (ws < 0)             compareAndSetWaitStatus(node, ws, 0);              //当前节点的下一个节点为空或者状态>0(即是取消状态)         Node s = node.next;         if (s == null || s.waitStatus > 0) {             s = null;             //那么从队尾开始往前遍历找到离当前节点最近的下一个状态<=0的节点(即非取消状态)             for (Node t = tail; t != null && t != node; t = t.prev)                 if (t.waitStatus <= 0)                     s = t;         }     	//唤醒下一个节点         if (s != null)             LockSupport.unpark(s.thread);     } 

6. await方法

public void await() throws InterruptedException { 	//调用AQS的acquireSharedInterruptibly方法加锁     sync.acquireSharedInterruptibly(1); } 

6.1. 使用AQS加锁

  • AQS acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)         throws InterruptedException {     if (Thread.interrupted())         throw new InterruptedException();     //调用Sync重写的tryAcquireShared判断是否加锁。     if (tryAcquireShared(arg) < 0)         doAcquireSharedInterruptibly(arg); } 
  • 6行:调用Sync重写的tryAcquireShared判断是否需要加锁,而不是真的加锁。可以看出当tryAcquireShared返回<0的时候需要往下执行doAcquireSharedInterruptibly进行加锁。
    而tryAcquireShared返回<0指的是一开始设置的count个信号量没有被用完,说明其他线程任务没执行完
  • 7行:加锁。准确说是加入AQS队列,阻塞等待其他线程执行完

下面详细说明:

6.1.1. 判断是否需要加锁

  • Sync tryAcquireShared
protected int tryAcquireShared(int acquires) { 	//当前锁的数量为0,即所有线程任务都执行完了,那么返回1不用加锁 	//否则>0指的是一开始设置的count个信号量没有被用完,说明其他线程任务没执行完。那么该线程需要进行加锁,故返回-1     return (getState() == 0) ? 1 : -1; } 

6.1.2. 需要加锁,那么加入AQS队列阻塞等待其他线程执行完

当state>0说明有信号量没被释放完,那么需要加锁

  • doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) 	throws InterruptedException { 	//以SHARE模式加入AQS队列 	final Node node = addWaiter(Node.SHARED); 	boolean failed = true; 	try { 	//死循环直到获取锁成功 		for (;;) { 			//逻辑1. 			//当前节点的前一个节点是头节点的时候(公平锁:即我的前面没有人等待获取锁),尝试获取锁 		    final Node p = node.predecessor(); 		    if (p == head) { 		    	//state == 0(即没人加锁的情况下才执行加锁--其实并没有真的加锁) 		        int r = tryAcquireShared(arg); 		        if (r >= 0) { 		        	//获取锁成功后设置头节点为当前节点 		            setHeadAndPropagate(node, r); 		            p.next = null; // help GC 		            failed = false; 		            return; 		        } 		    } 			//逻辑2. 			//当前节点的前一个节点状态时SIGNAL(承诺唤醒当前节点)的时候,阻塞当前线程。 			//什么时候唤醒?释放锁的时候 			//唤醒之后干什么?继续死循环执行上面的逻辑1 		    if (shouldParkAfterFailedAcquire(p, node) && 		        parkAndCheckInterrupt()) 		        throw new InterruptedException(); 		} 	//如果发生了异常,那么执行下面的逻辑 	} finally { 		//除了获取锁成功的情况都会执行cancelAcquire方法 		if (failed) 		    cancelAcquire(node); 	} } 
6.1.2.1. 构造节点加入AQS队列
  • AQS.addWaiter
 private Node addWaiter(Node mode) {  	//用当前线程、SHARED模式构造节点     Node node = new Node(Thread.currentThread(), mode);     // 队列不为空     Node pred = tail;     if (pred != null) {     	//插入到队尾         node.prev = pred;         if (compareAndSetTail(pred, node)) {             pred.next = node;             return node;         }     }     //队列为空或者插入到队尾失败     enq(node);     return node; } 

队列为空或者插入到队尾失败的情况执行enq,如下

  • AQS.enq
private Node enq(final Node node) { 	//死循环直到入队成功     for (;;) {         Node t = tail;     	//队列为空,那么初始化头节点。注意是new Node而不是当前node(即队头是个占位符)         if (t == null) {             if (compareAndSetHead(new Node()))                 tail = head; 		//队列不为空,插入到队尾         } else {             node.prev = t;             if (compareAndSetTail(t, node)) {                 t.next = node;                 return t;             }         }     } } 
6.1.2.2. 判断是否需要阻塞
  • shouldParkAfterFailedAcquire
//根据(前一个节点,当前节点)->是否阻塞当前线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {     int ws = pred.waitStatus; 	//前一个节点的状态时SIGNAL,即释放锁后承诺唤醒当前节点,那么返回true可以阻塞当前线程     if (ws == Node.SIGNAL)         return true;     //前一个节点状态>0,即CANCEL。     //那么往前遍历找到没有取消的前置节点。同时从链表中移除CANCEL状态的节点     if (ws > 0) {         do {             node.prev = pred = pred.prev;         } while (pred.waitStatus > 0);         pred.next = node; 	// 前置节点状态>=0,即0或者propagate。 	//这里通过CAS把前置节点状态改成signal成功获取锁,失败的话再阻塞。why?     } else {         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     }     return false; } 
6.1.2.3. 真正阻塞
  • parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() { 	//使用Unsafe阻塞当前线程,这里会清除线程中断的标记,因此需要返回中断的标记     LockSupport.park(this);     return Thread.interrupted(); } 

6.1.3. 不需要加锁

当state=0说明所有信号量已被释放完,那么直接返回,执行业务逻辑

7. 总结

  • 让一个线程等待其他线程完事再往下执行,类似于Thread.join()
  • 主线程创建CountDownLatch的时候初始化了信号量,相当于一开始就有N个人加锁。
  • 主线程调用await的时候检查信号量是否为0,不为0说明其他线程没有执行完,那么加入AQS队列阻塞,等待唤醒
  • 其他线程调用countDown的时候会使信号量-1,最后一个线程减为0的时候会唤醒AQS队列中的所有节点(主线程),让其继续往下执行
  • 主线程被唤醒继续往下执行

8. 参考

发表评论

评论已关闭。

相关文章