1. 是什么
队列同步器,用于实现JUC包的其他并发工具类
2. 如何使用
一般我们不直接使用AQS,而是使用JUC中的其他工具类(如CountDownLatch等),这些工具类覆盖了几乎所有的使用场景,只有在这些工具类无法满足我们的需求时,才去用AQS实现自己的并发工具。
实现的一般的套路如下:
- 定义并发工具类
- 在并发工具类内部定义一个静态内部类,实现AQS,根据需要重写一些方法
- 并发工具类中定义对外的方法,具体实现调用内部静态类的方法进行处理
如下代码为我们实现的简单的CountDownLatch
public class BooleanLatch { //定义内部静态类继承AQS //重写对state操作的方法 private static class Sync extends AbstractQueuedSynchronizer { boolean isSignalled() { return getState() != 0; } protected int tryAcquireShared(int ignore) { return isSignalled() ? 1 : -1; } protected boolean tryReleaseShared(int ignore) { setState(1); return true; } } //对外暴露的是封装后的方法 private final Sync sync = new Sync(); public boolean isSignalled() { return sync.isSignalled(); } public void signal() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public static void main(String[] args) throws InterruptedException { BooleanLatch booleanLatch = new BooleanLatch(); new Thread(()-> { try { TimeUnit.SECONDS.sleep(60); System.out.println(Thread.currentThread().getName() + "休眠结束"); booleanLatch.signal(); } catch (Exception e) { e.printStackTrace(); } }, "releaseThread").start(); booleanLatch.await(); System.out.println(Thread.currentThread().getName() + "继续运行"); } }
3. 原理分析
可以先阅读手写AQS.md以便更好的理解AQS源码
3.1. 构造方法
3.1.1. 由头尾节点和代表锁状态的字段组成
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { protected AbstractQueuedSynchronizer() { } //使用双向队列保存抢占锁失败的线程 private transient volatile Node head; private transient volatile Node tail; //使用CAS操作volatile state字段,表示锁的状态 //state > 0表示获取了锁,state = 0表示释放了锁 private volatile int state; }
3.1.2. Node是个双向队列节点
static final class Node { // 表示shared mode static final Node SHARED = new Node(); // 表示exclusive mode static final Node EXCLUSIVE = null; //当前线程被取消 static final int CANCELLED = 1; //下一个节点需要unpark static final int SIGNAL = -1; //Node的状态 volatile int waitStatus; //前一个节点 volatile Node prev; //后一个节点 volatile Node next; //该节点关联的线程 volatile Thread thread; // Node nextWaiter; // 用于在share mode下创建占位符的头节点 Node() { } // Used by addWaiter Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } }
结构如下图:

3.2. 获取锁的逻辑
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
如果tryAcquire返回True,那么表示得到了锁并且不用执行后面的逻辑,acquire直接返回,此时当前线程没有入队
3.2.1. 尝试获取锁
让我们看看tryAcquire的实现
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
what?居然就只是简单的抛出异常?其实这里运用了模板方法的设计模式,由子类决定具体实现,
如果tryAcquire返回False,便是没有获取锁,那么执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))这段逻辑。
3.2.2. 尝试获取锁失败,则加入AQS队列
首先是addWaiter(Node.EXCLUSIVE),把当前线程以EXCLUSIVE构成node加入等待队列
private Node addWaiter(Node mode) { //用当前线程、EXCLUSIVE模式构造节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; //队列中尾节点不为空(即不是空队列) //那么插入mode到队尾 if (pred != null) { //mode(新节点)的prev指针指向尾节点 node.prev = pred; //cas设置mode为新的尾节点 if (compareAndSetTail(pred, node)) { //设置成功则修改旧尾节点的next为mode pred.next = node; return node; } } //队列为空或者插入到队尾失败 enq(node); return node; }
- 8-17行:快速尝试入队,如果队列中尾节点不为空(即不是空队列),则尝试把当前线程构造的节点加入队列的尾部
- 19行:队列为空或者快速尝试入队失败,那么调用enq。enq代码如下:
private Node enq(final Node node) { for (;;) { Node t = tail; //队尾为空,说明队列为空需要初始化 if (t == null) { // Must initialize //设置new Node为队头(这个头是个占位符) if (compareAndSetHead(new Node())) tail = head; //队列不为空,加入队列尾部 //下面的逻辑跟addWaiter的快速尝试差不多 } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
- 2行:一直尝试,直到入队成功为止
- 3-10:如果队列为空,那么使用CAS操作设置new Node为队列头节点(这个头节点是个占位符),接着回到2行for循环,继而执行下面的else逻辑
- 11-16:队列不为空,使用CAS操作把当前节点加入到队列尾部
CAS操作就是调用的是Unsafe类的方法,如下:
private final boolean compareAndSetTail(Node expect, Node update) { //tail == expect?是的话更新tail为update return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
加入队尾成功后,接着执行acquireQueued方法
3.2.3. 阻塞等待,被唤醒后不停得抢占锁
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //node就是addWaiter里加入到队尾的节点,p是node的前一个节点 final Node p = node.predecessor(); //逻辑1: //p(前置节点)是head(占位符的头节点)的情况下才尝试获取锁。即当前节点前面没有人等待获取锁,换句话说当前节点就是实际的队头或者说等待时间最长的节点 if (p == head && tryAcquire(arg)) { //尝试获取锁成功后设置占位符头节点为node(当前节点) setHead(node); p.next = null; // help GC failed = false; return interrupted; } //逻辑2: //执行到这里说明上面的获取锁的条件不满足或者抢占锁失败 //那么需要判断是否需要阻塞,需要的话则进行阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) //何时执行这段逻辑?发生异常导致获取锁失败的时候 cancelAcquire(node); } }
- 5行:死循环尝试直到获取锁成功
- 6-17行:如果刚刚使用addWaiter入队尾的当前节点前面没有其他节点在等待,即我是等待时间最长的节点,那么尝试抢占锁。这里之所以还需要抢占是因为可以有其他线程(不是队列中的线程)同时进来抢占锁
- 18-23行:抢占锁失败则需要阻塞。阻塞后什么时候被唤醒?当前节点(当前线程)的前序节点释放同步状态时,会唤醒该节点(该线程)(unparkSuccessor)。唤醒之后干什么?继续执行上面的逻辑1
3.2.3.1. 判断是否需要阻塞当前线程
- shouldParkAfterFailedAcquire
//根据(前一个节点,当前节点)->是否阻塞当前线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //如果当前节点的前一个节点状态为SIGNAL //表示它(前一个节点)承诺唤醒当前节点,那么可以放心阻塞,故直接返回ture if (ws == Node.SIGNAL) return true; //>0表示状态为取消的节点 //前置节点已被取消 if (ws > 0) { //一直往前遍历找到没有取消的节点,并且遍历过程中把取消的节点删除 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; //前置节点未被取消(即为 0 或者 Node.PROPAGATE) } else { //CAS设置前置节点状态为SIGNAL,下次进来就是5-6行 //pred的waitStatus==ws?是的话更新waitStatus为Node.SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
- 6-7行:当前节点的前置节点状态为SIGNAL的时候,可以阻塞
3.2.3.2. 阻塞当前线程
- parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() { //简单的调用LockSupport的park方法阻塞当前线程 LockSupport.park(this); //返回是否被中断的标志 return Thread.interrupted(); }
- LockSupport park
public static void park(Object blocker) { Thread t = Thread.currentThread(); //用当前AQS实例对象作为blocker,记录当前线程等待的对象(阻塞对象) setBlocker(t, blocker); //简单调用UNSAFE的park方法阻塞当前线程 UNSAFE.park(false, 0L); setBlocker(t, null); }
3.2.3.3. 抢占锁过程中发生异常,那么从阻塞队列中移除当前节点
- cancelAcquire
//node是addWaiter加入队尾的节点 //这段取消的逻辑要做的就是从队列中删除该节点 private void cancelAcquire(Node node) { // 当前节点为空,那么不需要删除,直接返回 if (node == null) return; //1. node不再关联到任何线程 node.thread = null; //2. 找到前置节点 //跳过被cancel的前继node,往前找到一个有效的前继节点pred Node pred = node.prev; while (pred.waitStatus > 0) //>0表示状态为取消的节点 node.prev = pred = pred.prev;//限制性pred=pred.prev;在执行node.prev=pred //predNext是前置节点的下一个节点,注意这里可能不是当前node Node predNext = pred.next; //3. 将node的waitStatus置为CANCELLED node.waitStatus = Node.CANCELLED; //4. 从队列中删除该节点 //当前node是尾节点的删除操作:CAS设置队列的尾节点为pred if (node == tail && compareAndSetTail(node, pred)) { //CAS设置pred.next为null compareAndSetNext(pred, predNext, null); //当前node不是尾节点的删除操作 } else { int ws; //5. 下面的判断是指node既不是tail,又不是head的后继节点的情况 //当前节点的前置节点不是队头 //并且 //要么 前置节点的状态为Node.SIGNAL(即承诺唤醒下一个节点) //要么 前置节点的状态是正常的且CAS设置前置节点的状态为Node.SIGNAL成功 且 //前置节点的线程为null if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { //使node的前继节点的next指向node的后继节点,相当于删除了node Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); //6. 如果node是head的后继节点,则直接唤醒node的后继节点就行 } else { unparkSuccessor(node); } node.next = node; // help GC } }
3.2.4. 恢复中断标记
如果acquireQueued返回true,说明发生过中断。
但是呢,acquireQueued会把中断标记清除,因此需要手动调用selfInterrupt恢复中断标记
static void selfInterrupt() { Thread.currentThread().interrupt(); }
3.3. 释放锁的逻辑
public final boolean release(int arg) { //释放同步状态成功 if (tryRelease(arg)) { //把头节点从队列中移除 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
解锁的逻辑主要是:
- 2行:尝试释放锁
- 3-5行:释放锁成功则唤醒下一个节点的线程,让其继续抢占锁
详细说明如下:
3.3.1. 尝试释放锁
- tryRelease,和tryAcquire方法一样延迟到子类实现
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); }
3.3.2. 释放锁成功则唤醒下一个节点的线程,让其继续抢占锁
- 如果头节点不为null,那么调用unparkSuccessor方法唤醒下一个节点
private void unparkSuccessor(Node node) { int ws = node.waitStatus; //如果当前节点的状态是正常的? if (ws < 0) //当前节点的状态<0,则把状态改为0 //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何 compareAndSetWaitStatus(node, ws, 0); //获取当前节点的下一个节点 Node s = node.next; //如果下一个节点是空(即当前节点是尾节点)或者下一个节点的状态>0(取消) if (s == null || s.waitStatus > 0) { s = null; //从尾节点往前遍历至当前节点 for (Node t = tail; t != null && t != node; t = t.prev) //找到最靠近当前节点的状态<=0(非取消)的节点 if (t.waitStatus <= 0) s = t; } //唤醒当前节点的下一个节点 if (s != null) LockSupport.unpark(s.thread); }
- unpark方法
public static void unpark(Thread thread) { if (thread != null) //唤醒thread线程 UNSAFE.unpark(thread); }
4. 总结
- thead1获取锁成功,执行业务逻辑
- thread2获取失败,进入CLH队尾,不停自旋检查
- 如果前一个节点的状态是signal,则进入阻塞;否则一直自旋直到获取锁成功
- thread1释放锁,唤醒下一个节点
- thead2从阻塞中醒来,获取锁成功,执行业务逻辑
5. 参考
- JUC的AQS学习-ReentrantLock源代码分析 | 并发编程网 – ifeve.com
- Java AQS unparkSuccessor 方法中for循环从tail开始而不是head的疑问? - 知乎
- 【死磕 Java 并发】—– J.U.C 之 AQS:AQS 简介 | 芋道源码 —— 纯源码解析博客
- 【死磕 Java 并发】—– J.U.C 之 AQS:CLH 同步队列 | 芋道源码 —— 纯源码解析博客
- 【死磕 Java 并发】—– J.U.C 之 AQS:同步状态的获取与释放 | 芋道源码 —— 纯源码解析博客
- 【死磕 Java 并发】—– J.U.C 之 AQS:阻塞和唤醒线程 | 芋道源码 —— 纯源码解析博客