Java源码分析系列笔记-5.AQS

1. 是什么

队列同步器,用于实现JUC包的其他并发工具类

2. 如何使用

一般我们不直接使用AQS,而是使用JUC中的其他工具类(如CountDownLatch等),这些工具类覆盖了几乎所有的使用场景,只有在这些工具类无法满足我们的需求时,才去用AQS实现自己的并发工具。
实现的一般的套路如下:

  • 定义并发工具类
  • 在并发工具类内部定义一个静态内部类,实现AQS,根据需要重写一些方法
  • 并发工具类中定义对外的方法,具体实现调用内部静态类的方法进行处理

如下代码为我们实现的简单的CountDownLatch

public class BooleanLatch {      //定义内部静态类继承AQS     //重写对state操作的方法     private static class Sync extends AbstractQueuedSynchronizer     {         boolean isSignalled()         {             return getState() != 0;         }          protected int tryAcquireShared(int ignore)         {             return isSignalled() ? 1 : -1;         }          protected boolean tryReleaseShared(int ignore)         {             setState(1);             return true;         }     }      //对外暴露的是封装后的方法     private final Sync sync = new Sync();      public boolean isSignalled()     {         return sync.isSignalled();     }      public void signal()     {         sync.releaseShared(1);     }      public void await() throws InterruptedException     {         sync.acquireSharedInterruptibly(1);     }      public static void main(String[] args) throws InterruptedException     {         BooleanLatch booleanLatch = new BooleanLatch();         new Thread(()-> {             try             {                 TimeUnit.SECONDS.sleep(60);                 System.out.println(Thread.currentThread().getName() + "休眠结束");                 booleanLatch.signal();             }             catch (Exception e)             {                 e.printStackTrace();             }         }, "releaseThread").start();          booleanLatch.await();         System.out.println(Thread.currentThread().getName() + "继续运行");      } } 

3. 原理分析

可以先阅读手写AQS.md以便更好的理解AQS源码

3.1. 构造方法

3.1.1. 由头尾节点和代表锁状态的字段组成

public abstract class AbstractQueuedSynchronizer     extends AbstractOwnableSynchronizer     implements java.io.Serializable {          protected AbstractQueuedSynchronizer() { }  	//使用双向队列保存抢占锁失败的线程     private transient volatile Node head;     private transient volatile Node tail;  	//使用CAS操作volatile state字段,表示锁的状态 	//state > 0表示获取了锁,state = 0表示释放了锁     private volatile int state; }  

3.1.2. Node是个双向队列节点

static final class Node {         // 表示shared mode         static final Node SHARED = new Node();         // 表示exclusive mode         static final Node EXCLUSIVE = null;                  //当前线程被取消         static final int CANCELLED =  1;         //下一个节点需要unpark      	static final int SIGNAL    = -1;          //Node的状态         volatile int waitStatus; 		//前一个节点         volatile Node prev; 		//后一个节点         volatile Node next; 		//该节点关联的线程         volatile Thread thread;         //         Node nextWaiter; 	 		// 用于在share mode下创建占位符的头节点         Node() {             }   		// Used by addWaiter         Node(Thread thread, Node mode) {                 this.nextWaiter = mode;             this.thread = thread;         }      }  

结构如下图:
Java源码分析系列笔记-5.AQS

3.2. 获取锁的逻辑

public final void acquire(int arg) {         if (!tryAcquire(arg) &&             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))             selfInterrupt();     }  

如果tryAcquire返回True,那么表示得到了锁并且不用执行后面的逻辑,acquire直接返回,此时当前线程没有入队

3.2.1. 尝试获取锁

让我们看看tryAcquire的实现

protected boolean tryAcquire(int arg) {         throw new UnsupportedOperationException();     } 

what?居然就只是简单的抛出异常?其实这里运用了模板方法的设计模式,由子类决定具体实现,
如果tryAcquire返回False,便是没有获取锁,那么执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))这段逻辑。

3.2.2. 尝试获取锁失败,则加入AQS队列

首先是addWaiter(Node.EXCLUSIVE),把当前线程以EXCLUSIVE构成node加入等待队列

private Node addWaiter(Node mode) {     //用当前线程、EXCLUSIVE模式构造节点     Node node = new Node(Thread.currentThread(), mode);     // Try the fast path of enq; backup to full enq on failure     Node pred = tail;     //队列中尾节点不为空(即不是空队列)     //那么插入mode到队尾     if (pred != null) {         //mode(新节点)的prev指针指向尾节点         node.prev = pred;         //cas设置mode为新的尾节点         if (compareAndSetTail(pred, node)) {             //设置成功则修改旧尾节点的next为mode             pred.next = node;             return node;         }     }     //队列为空或者插入到队尾失败     enq(node);     return node; } 
  • 8-17行:快速尝试入队,如果队列中尾节点不为空(即不是空队列),则尝试把当前线程构造的节点加入队列的尾部
  • 19行:队列为空或者快速尝试入队失败,那么调用enq。enq代码如下:
private Node enq(final Node node) {     for (;;) {         Node t = tail;         //队尾为空,说明队列为空需要初始化         if (t == null) { // Must initialize             //设置new Node为队头(这个头是个占位符)             if (compareAndSetHead(new Node()))                 tail = head;         //队列不为空,加入队列尾部         //下面的逻辑跟addWaiter的快速尝试差不多         } else {             node.prev = t;             if (compareAndSetTail(t, node)) {                 t.next = node;                 return t;             }         }     } } 
  • 2行:一直尝试,直到入队成功为止
  • 3-10:如果队列为空,那么使用CAS操作设置new Node为队列头节点(这个头节点是个占位符),接着回到2行for循环,继而执行下面的else逻辑
  • 11-16:队列不为空,使用CAS操作把当前节点加入到队列尾部
    CAS操作就是调用的是Unsafe类的方法,如下:
private final boolean compareAndSetTail(Node expect, Node update) { 		//tail == expect?是的话更新tail为update     return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } 

加入队尾成功后,接着执行acquireQueued方法

3.2.3. 阻塞等待,被唤醒后不停得抢占锁

final boolean acquireQueued(final Node node, int arg) {     boolean failed = true;     try {         boolean interrupted = false;         for (;;) {             //node就是addWaiter里加入到队尾的节点,p是node的前一个节点             final Node p = node.predecessor();             //逻辑1:             //p(前置节点)是head(占位符的头节点)的情况下才尝试获取锁。即当前节点前面没有人等待获取锁,换句话说当前节点就是实际的队头或者说等待时间最长的节点             if (p == head && tryAcquire(arg)) {                 //尝试获取锁成功后设置占位符头节点为node(当前节点)                 setHead(node);                 p.next = null; // help GC                 failed = false;                 return interrupted;             }                          //逻辑2:             //执行到这里说明上面的获取锁的条件不满足或者抢占锁失败             //那么需要判断是否需要阻塞,需要的话则进行阻塞             if (shouldParkAfterFailedAcquire(p, node) &&                 parkAndCheckInterrupt())                 interrupted = true;         }     } finally {         if (failed)             //何时执行这段逻辑?发生异常导致获取锁失败的时候             cancelAcquire(node);     } } 
  • 5行:死循环尝试直到获取锁成功
  • 6-17行:如果刚刚使用addWaiter入队尾的当前节点前面没有其他节点在等待,即我是等待时间最长的节点,那么尝试抢占锁。这里之所以还需要抢占是因为可以有其他线程(不是队列中的线程)同时进来抢占锁
  • 18-23行:抢占锁失败则需要阻塞。阻塞后什么时候被唤醒?当前节点(当前线程)的前序节点释放同步状态时,会唤醒该节点(该线程)(unparkSuccessor)。唤醒之后干什么?继续执行上面的逻辑1
3.2.3.1. 判断是否需要阻塞当前线程
  • shouldParkAfterFailedAcquire
//根据(前一个节点,当前节点)->是否阻塞当前线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {         int ws = pred.waitStatus;         //如果当前节点的前一个节点状态为SIGNAL         //表示它(前一个节点)承诺唤醒当前节点,那么可以放心阻塞,故直接返回ture         if (ws == Node.SIGNAL)             return true;                      //>0表示状态为取消的节点         //前置节点已被取消         if (ws > 0) {              //一直往前遍历找到没有取消的节点,并且遍历过程中把取消的节点删除 	          do {                 node.prev = pred = pred.prev;             } while (pred.waitStatus > 0);             pred.next = node;         //前置节点未被取消(即为 0 或者 Node.PROPAGATE)         } else {              //CAS设置前置节点状态为SIGNAL,下次进来就是5-6行             //pred的waitStatus==ws?是的话更新waitStatus为Node.SIGNAL             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);         }         return false;     } 
  • 6-7行:当前节点的前置节点状态为SIGNAL的时候,可以阻塞
3.2.3.2. 阻塞当前线程
  • parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {     //简单的调用LockSupport的park方法阻塞当前线程     LockSupport.park(this);     //返回是否被中断的标志     return Thread.interrupted(); } 
  • LockSupport park
public static void park(Object blocker) {     Thread t = Thread.currentThread();     //用当前AQS实例对象作为blocker,记录当前线程等待的对象(阻塞对象)     setBlocker(t, blocker);     //简单调用UNSAFE的park方法阻塞当前线程     UNSAFE.park(false, 0L);     setBlocker(t, null); } 
3.2.3.3. 抢占锁过程中发生异常,那么从阻塞队列中移除当前节点
  • cancelAcquire
//node是addWaiter加入队尾的节点 //这段取消的逻辑要做的就是从队列中删除该节点 private void cancelAcquire(Node node) {     // 当前节点为空,那么不需要删除,直接返回     if (node == null)         return;     //1. node不再关联到任何线程     node.thread = null;     //2. 找到前置节点     //跳过被cancel的前继node,往前找到一个有效的前继节点pred     Node pred = node.prev;     while (pred.waitStatus > 0) //>0表示状态为取消的节点         node.prev = pred = pred.prev;//限制性pred=pred.prev;在执行node.prev=pred      //predNext是前置节点的下一个节点,注意这里可能不是当前node     Node predNext = pred.next;     //3. 将node的waitStatus置为CANCELLED     node.waitStatus = Node.CANCELLED;     //4. 从队列中删除该节点     //当前node是尾节点的删除操作:CAS设置队列的尾节点为pred     if (node == tail && compareAndSetTail(node, pred)) {         //CAS设置pred.next为null         compareAndSetNext(pred, predNext, null); 	//当前node不是尾节点的删除操作     } else { 		int ws;         //5. 下面的判断是指node既不是tail,又不是head的后继节点的情况          //当前节点的前置节点不是队头          //并且                  //要么 前置节点的状态为Node.SIGNAL(即承诺唤醒下一个节点)                 //要么 前置节点的状态是正常的且CAS设置前置节点的状态为Node.SIGNAL成功 且                     //前置节点的线程为null         if (pred != head &&             ((ws = pred.waitStatus) == Node.SIGNAL ||              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&             pred.thread != null) {              //使node的前继节点的next指向node的后继节点,相当于删除了node             Node next = node.next;             if (next != null && next.waitStatus <= 0)                 compareAndSetNext(pred, predNext, next);         //6. 如果node是head的后继节点,则直接唤醒node的后继节点就行         } else {             unparkSuccessor(node);         }          node.next = node; // help GC     } } 

3.2.4. 恢复中断标记

如果acquireQueued返回true,说明发生过中断。
但是呢,acquireQueued会把中断标记清除,因此需要手动调用selfInterrupt恢复中断标记

static void selfInterrupt() {     Thread.currentThread().interrupt(); }  

3.3. 释放锁的逻辑

public final boolean release(int arg) { 	//释放同步状态成功     if (tryRelease(arg)) {     	//把头节点从队列中移除         Node h = head;         if (h != null && h.waitStatus != 0)             unparkSuccessor(h);         return true;     }     return false; } 

解锁的逻辑主要是:

  • 2行:尝试释放锁
  • 3-5行:释放锁成功则唤醒下一个节点的线程,让其继续抢占锁

详细说明如下:

3.3.1. 尝试释放锁

  • tryRelease,和tryAcquire方法一样延迟到子类实现
protected boolean tryRelease(int arg) {     throw new UnsupportedOperationException(); } 

3.3.2. 释放锁成功则唤醒下一个节点的线程,让其继续抢占锁

  • 如果头节点不为null,那么调用unparkSuccessor方法唤醒下一个节点
private void unparkSuccessor(Node node) {          int ws = node.waitStatus;     //如果当前节点的状态是正常的?     if (ws < 0)         //当前节点的状态<0,则把状态改为0         //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何         compareAndSetWaitStatus(node, ws, 0);            //获取当前节点的下一个节点     Node s = node.next;     //如果下一个节点是空(即当前节点是尾节点)或者下一个节点的状态>0(取消)     if (s == null || s.waitStatus > 0) {         s = null;         //从尾节点往前遍历至当前节点         for (Node t = tail; t != null && t != node; t = t.prev)             //找到最靠近当前节点的状态<=0(非取消)的节点             if (t.waitStatus <= 0)                 s = t;     }     //唤醒当前节点的下一个节点     if (s != null)         LockSupport.unpark(s.thread); } 
  • unpark方法
public static void unpark(Thread thread) {     if (thread != null)         //唤醒thread线程         UNSAFE.unpark(thread); }  

4. 总结

  • thead1获取锁成功,执行业务逻辑
  • thread2获取失败,进入CLH队尾,不停自旋检查
    • 如果前一个节点的状态是signal,则进入阻塞;否则一直自旋直到获取锁成功
  • thread1释放锁,唤醒下一个节点
  • thead2从阻塞中醒来,获取锁成功,执行业务逻辑

5. 参考

发表评论

评论已关闭。

相关文章