public class ReentrantLock implements Lock, java.io.Serializable { ... //Synchronizer providing all implementation mechanics private final Sync sync; //Creates an instance of ReentrantLock. //This is equivalent to using ReentrantLock(false). public ReentrantLock() { sync = new NonfairSync(); } //Creates an instance of ReentrantLock with the given fairness policy. public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ... }
public class ReentrantLock implements Lock, java.io.Serializable { ... //Acquires the lock. //Acquires the lock if it is not held by another thread and returns immediately, //setting the lock hold count to one. //If the current thread already holds the lock, //then the hold count is incremented by one and the method returns immediately. //If the lock is held by another thread, //then the current thread becomes disabled for thread scheduling purposes //and lies dormant until the lock has been acquired, at which time the lock hold count is set to one. public void lock() { //执行继承自ReentrantLock内部类Sync的NonfairSync类或FairSync类的lock()方法 sync.lock(); } //Acquires the lock unless the current thread is Thread#interrupt interrupted. public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } //Acquires the lock only if it is not held by another thread at the time of invocation. public boolean tryLock() { return sync.nonfairTryAcquire(1); } //Acquires the lock if it is not held by another thread within the given waiting time //and the current thread has not been Thread#interrupt interrupted. public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } //Attempts to release this lock. public void unlock() { sync.release(1); } ... }
public class ReentrantLock implements Lock, java.io.Serializable { ... //Synchronizer providing all implementation mechanics private final Sync sync; //Base of synchronization control for this lock. //Subclassed into fair and nonfair versions below. //Uses AQS state to represent the number of holds on the lock. abstract static class Sync extends AbstractQueuedSynchronizer { ... } //Sync object for fair locks static final class FairSync extends Sync { ... } //Sync object for non-fair locks static final class NonfairSync extends Sync { ... } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Head of the wait queue, lazily initialized. //Except for initialization, it is modified only via method setHead. //Note: If head exists, its waitStatus is guaranteed not to be CANCELLED. private transient volatile Node head; //Tail of the wait queue, lazily initialized. //Modified only via method enq to add new wait node. private transient volatile Node tail; //The synchronization state. private volatile int state; ... }
public class ReentrantLock implements Lock, java.io.Serializable { //Synchronizer providing all implementation mechanics private final Sync sync; ... //If the lock is held by another thread, //then the current thread becomes disabled for thread scheduling purposes //and lies dormant until the lock has been acquired, at which time the lock hold count is set to one. public void lock() { //执行继承自ReentrantLock内部类Sync的NonfairSync类或FairSync类的lock()方法 sync.lock(); } //Sync object for non-fair locks static final class NonfairSync extends Sync { //Performs lock. Try immediate barge, backing up to normal acquire on failure. final void lock() { //执行AQS的compareAndSetState()方法 if (compareAndSetState(0, 1)) { //执行AQS的setExclusiveOwnerThread()方法 setExclusiveOwnerThread(Thread.currentThread()); } else { //执行AQS的acquire()方法 acquire(1); } } ... } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private volatile int state; static { try { stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state")); ... } catch (Exception ex) { throw new Error(ex); } } //Atomically sets synchronization state to the given updated value if the current state value equals the expected value. //This operation has memory semantics of a volatile read and write. protected final boolean compareAndSetState(int expect, int update) { //See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } ... } public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { ... //The current owner of exclusive mode synchronization. private transient Thread exclusiveOwnerThread; //Sets the thread that currently owns exclusive access. //A null argument indicates that no thread owns access. //This method does not otherwise impose any synchronization or volatile field accesses. protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } ... }
public class ReentrantLock implements Lock, java.io.Serializable { ... //Synchronizer providing all implementation mechanics private final Sync sync; public void lock() { //执行继承自ReentrantLock内部类Sync的NonfairSync类或FairSync类的lock()方法 sync.lock(); } //NonfairSync是ReentractLock的内部类,继承自ReentractLock的另一内部类Sync static final class NonfairSync extends Sync { final void lock() { //执行AQS的compareAndSetState()方法 if (compareAndSetState(0, 1)) { //执行AQS的setExclusiveOwnerThread()方法 setExclusiveOwnerThread(Thread.currentThread()); } else { //执行AQS的acquire()方法 acquire(1); } } //判断是否是重入锁 + 是否已释放锁 protected final boolean tryAcquire(int acquires) { //执行继承自AQS的Sync的nonfairTryAcquire()方法 return nonfairTryAcquire(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { ... //判断是否是重入锁 + 是否已释放锁,尝试获取锁 final boolean nonfairTryAcquire(int acquires) { //先获取当前的线程 final Thread current = Thread.currentThread(); //获取volatile修饰的state变量值 int c = getState(); //有可能在获取当前线程和state变量值的时候,持有锁的线程释放了锁 //所以需要再次判断一下state是否为0,如果state是0,那么再次尝试加锁 if (c == 0) {//表示无锁状态 //执行AQS的compareAndSetState()方法,CAS设置state成功表明当前线程获取到锁 if (compareAndSetState(0, acquires)) { //执行AQS的setExclusiveOwnerThread()方法,保存获得锁的线程为当前线程 setExclusiveOwnerThread(current); return true; } } //代码执行到这里还是没有线程释放锁,state还是 != 0 //所以先通过AQS的getExclusiveOwnerThread()方法获取当前线程 //然后再判断持有锁的线程是否为当前线程,即当前线程 == exclusiveOwnerThread //如果持有锁的线程是当前线程,则代表当前线程在重复加锁,所以增加重入次数 else if (current == getExclusiveOwnerThread()) { //此时,c = 1,而nextc = c(1) + acquires(1) = 2 //这代表的是当前线程重入锁2次,nextc代表重入次数 int nextc = c + acquires; if (nextc < 0) { throw new Error("Maximum lock count exceeded"); } //执行AQS的setState()方法,修改volatile修饰的state setState(nextc); return true; } return false; } ... } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //The synchronization state. private volatile int state; //Returns the current value of synchronization state. //This operation has memory semantics of a volatile read. //@return current state value protected final int getState() { return state; } //Sets the value of synchronization state. //This operation has memory semantics of a volatile write. //@param newState the new state value protected final void setState(int newState) { state = newState; } //Acquires in exclusive mode, ignoring interrupts. //Implemented by invoking at least once tryAcquire, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success. //This method can be used to implement method Lock#lock. public final void acquire(int arg) { //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } //Attempts to acquire in exclusive mode. //This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it. //This method is always invoked by the thread performing acquire. //If this method reports failure, the acquire method may queue the thread, if it is not already queued, //until it is signalled by a release from some other thread. //This can be used to implement method Lock#tryLock(). //The default implementation throws UnsupportedOperationException. protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } ... } public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { ... //The current owner of exclusive mode synchronization. private transient Thread exclusiveOwnerThread; //Sets the thread that currently owns exclusive access. //A null argument indicates that no thread owns access. //This method does not otherwise impose any synchronization or volatile field accesses. protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } //Returns the thread last set by setExclusiveOwnerThread, or null if never set. //This method does not otherwise impose any synchronization or volatile field accesses. protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } ... }
public class ReentrantLock implements Lock, java.io.Serializable { ... //Synchronizer providing all implementation mechanics private final Sync sync; public void lock() { //执行继承ReentrantLock内部类Sync的NonfairSync类的lock()方法 sync.lock(); } ... } public class ReentrantLock implements Lock, java.io.Serializable { ... //这是ReentractLock里的内部类 static final class NonfairSync extends Sync { final void lock() { //执行AQS的compareAndSetState()方法 if (compareAndSetState(0, 1)) { //执行AQS的setExclusiveOwnerThread()方法 setExclusiveOwnerThread(Thread.currentThread()); } else { //通过CAS获取锁失败时,执行AQS的acquire()方法 acquire(1); } } //判断是否是重入锁 + 是否已释放锁 protected final boolean tryAcquire(int acquires) { //执行继承自AQS的Sync的nonfairTryAcquire()方法 return nonfairTryAcquire(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { ... //判断是否是重入锁 + 是否已释放锁,尝试获取锁 final boolean nonfairTryAcquire(int acquires) { //先获取当前的线程 final Thread current = Thread.currentThread(); //获取volatile修饰的state变量值 int c = getState(); //有可能在获取当前线程和state变量值的时候,持有锁的线程释放了锁 //所以需要再次判断一下state是否为0,如果state是0,那么再次尝试加锁 if (c == 0) {//表示无锁状态 //执行AQS的compareAndSetState()方法,CAS设置state成功表明当前线程获取到锁 if (compareAndSetState(0, acquires)) { //执行AQS的setExclusiveOwnerThread()方法,保存获得锁的线程为当前线程 setExclusiveOwnerThread(current); return true; } } //代码执行到这里还是没有线程释放锁,state还是 != 0 //所以先通过AQS的getExclusiveOwnerThread()方法获取当前线程 //然后再判断持有锁的线程是否为当前线程,即当前线程 == exclusiveOwnerThread //如果持有锁的线程是当前线程,则代表当前线程在重复加锁,所以增加重入次数 else if (current == getExclusiveOwnerThread()) { //此时,c = 1,而nextc = c(1) + acquires(1) = 2 //这代表的是当前线程重入锁2次,nextc代表重入次数 int nextc = c + acquires; if (nextc < 0) { throw new Error("Maximum lock count exceeded"); } //修改这个volatile修饰的state,volatile保证了可见性 setState(nextc); return true; } return false; } ... } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Acquires in exclusive mode, ignoring interrupts. //Implemented by invoking at least once tryAcquire, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success. //This method can be used to implement method Lock#lock. public final void acquire(int arg) { //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁 //当前线程加锁失败时,执行NonfairSync的tryAcquire()方法会返回false,即执行ReentrantLock内部类Sync的nonfairTryAcquire()方法返回false //所以当前线程加锁失败时会先执行AQS的addWaiter()方法,再执行AQS的acquireQueued()方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } //Attempts to acquire in exclusive mode. //This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it. //This method is always invoked by the thread performing acquire. //If this method reports failure, the acquire method may queue the thread, if it is not already queued, //until it is signalled by a release from some other thread. //This can be used to implement method Lock#tryLock(). //The default implementation throws UnsupportedOperationException. protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } ... }
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private transient volatile Node head; private transient volatile Node tail; private static final long headOffset; private static final long tailOffset; static { try { headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail")); ... } catch (Exception ex) { throw new Error(ex); } } ... //Acquires in exclusive mode, ignoring interrupts. //Implemented by invoking at least once tryAcquire, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success. //This method can be used to implement method Lock#lock. public final void acquire(int arg) { //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁 //当前线程加锁失败时,执行NonfairSync的tryAcquire()方法会返回false,即执行ReentrantLock内部类Sync的nonfairTryAcquire()方法返回false //所以当前线程加锁失败时会先执行AQS的addWaiter()方法,再执行AQS的acquireQueued()方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { selfInterrupt(); } } //Attempts to acquire in exclusive mode. //This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it. //This method is always invoked by the thread performing acquire. //If this method reports failure, the acquire method may queue the thread, if it is not already queued, //until it is signalled by a release from some other thread. //This can be used to implement method Lock#tryLock(). //The default implementation throws UnsupportedOperationException. protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //Creates and enqueues node for current thread and given mode. //@param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared //@return the new node private Node addWaiter(Node mode) { //首先将获取锁失败的线程封装为一个Node对象; //然后使用尾插法将这个Node对象插入双向链表中,形成一个等待队列; Node node = new Node(Thread.currentThread(), mode); Node pred = tail;//tail是AQS等待队列的尾结点 if (pred != null) {//当tail不为空时 node.prev = pred;//把根据当前线程封装的node结点的prev指向tail if (compareAndSetTail(pred, node)) {//通过CAS把node结点插入等待队列尾部 pred.next = node;//把原tail的next指向node结点 return node; } } //当tail为空时,把node结点添加到等待队列 enq(node); return node; } private Node enq(final Node node) { for (;;) { //首先获取等待队列的队尾结点 Node t = tail; //队列的头是个空Node,新增一个结点会从尾部插入 if (t == null) { //初始化一个空的Node结点并赋值给head和tail if (compareAndSetHead(new Node())) { tail = head; } } else { node.prev = t; //尝试比较tail变量是否为t,如果为t的话,那么tail指针就指向node if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } private final boolean compareAndSetHead(Node update) { //headOffset是head变量在AQS类的位置,判断head变量是否为null //如果是null就将head设置为空的Node结点 //所以队列的头是个空Node,新增一个结点会从尾部插入 return unsafe.compareAndSwapObject(this, headOffset, null, update); } private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } static final class Node { //如果当前线程获取锁失败,则会进入阻塞等待的状态,当前线程会被挂起; //阻塞状态可以分为很多种不同的阻塞状态: //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3) volatile int waitStatus; //一个结点的上一个结点,prev指针,指向Node结点的上一个Node volatile Node prev; //一个结点的下一个结点,next指针,指向Node结点的下一个Node volatile Node next; //Node结点里封装的一个线程 volatile Thread thread; //可以认为是下一个等待线程 Node nextWaiter; } ... }
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Acquires in exclusive mode, ignoring interrupts. //Implemented by invoking at least once tryAcquire, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success. //This method can be used to implement method Lock#lock. public final void acquire(int arg) { //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁 //当前线程加锁失败时,执行NonfairSync的tryAcquire()方法会返回false,即执行ReentrantLock内部类Sync的nonfairTryAcquire()方法返回false //所以当前线程加锁失败时会先执行AQS的addWaiter()方法,再执行AQS的acquireQueued()方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { //对当前线程进行中断 selfInterrupt(); } } //Convenience method to interrupt current thread. static void selfInterrupt() { Thread.currentThread().interrupt(); } //Acquires in exclusive uninterruptible mode for thread already in queue. //Used by condition wait methods as well as acquire. //@param node the node //@param arg the acquire argument //@return true if interrupted while waiting final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //获取node结点的上一个结点,也就是prev指针指向的结点 final Node p = node.predecessor(); //这里会先判断传入结点的上一个结点是否为等待队列的头结点 //如果是,则再次调用继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,尝试获取锁 //如果获取锁成功,那么就将当前线程对应的Node结点从等待队列中移除 if (p == head && tryAcquire(arg)) { //重新设置传入的Node结点为头结点,同时将该结点设置为空 setHead(node);//线程唤醒后,头结点会后移 p.next = null;//help GC failed = false; return interrupted; } //如果再次尝试获取锁失败,则执行shouldParkAfterFailedAcquire()方法 //判断是否需要将当前线程挂起,然后进行阻塞等待 //如果需要挂起,那么就会使用park操作挂起当前线程 //执行shouldParkAfterFailedAcquire()方法会设置node结点的前驱结点的状态为SIGNAL //执行parkAndCheckInterrupt()方法挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { //如果线程被中断,只会暂时设置interrupted为true,还需要继续等待被唤醒来获取锁才能真正中断 interrupted = true; } } } finally { if (failed) { cancelAcquire(node); } } } //Sets head of queue to be node, thus dequeuing. //Called only by acquire methods. //Also nulls out unused fields for sake of GC and to suppress unnecessary signals and traversals. private void setHead(Node node) { //重新设置传入的Node结点为头结点,同时将该结点设置为空 head = node; node.thread = null; node.prev = null; } //Checks and updates status for a node that failed to acquire. //Returns true if thread should block. //This is the main signal control in all acquire loops. //Requires that pred == node.prev. //@param pred node's predecessor holding status //@param node the node //@return true if thread should block private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //pred -> 空Node //Node结点的状态watiStatus可以分为如下几种: //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3) //默认情况下,watiStatus应该是0,或者是空 int ws = pred.waitStatus; //如果前驱结点的状态为SIGNAL,那么当前线程就需要被挂起进行阻塞 if (ws == Node.SIGNAL) { return true; } //如果前驱结点的状态为CANCELED,那么需要移除前驱结点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //修改前驱结点的状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } private final boolean parkAndCheckInterrupt() { //LockSupport的park操作,就是将一个线程进行挂起 //必须得有另外一个线程来对当前线程执行unpark操作,才能唤醒挂起的线程 LockSupport.park(this); return Thread.interrupted(); } ... }
public class ReentrantLock implements Lock, java.io.Serializable { ... //Synchronizer providing all implementation mechanics private final Sync sync; //Attempts to release this lock. //If the current thread is the holder of this lock then the hold count is decremented. //If the hold count is now zero then the lock is released. //If the current thread is not the holder of this lock then IllegalMonitorStateException is thrown. public void unlock() { //执行AQS的release()方法 sync.release(1); } ... abstract static class Sync extends AbstractQueuedSynchronizer { ... protected final boolean tryRelease(int releases) { //重入锁的情况:getState() == 2,releases == 1,getState() - releases = 1,c = 1 int c = getState() - releases; //如果当前线程不等于加锁的线程,说明不是当前线程加的锁,结果当前线程来释放锁,会抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) { throw new IllegalMonitorStateException(); } boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } ... } } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Releases in exclusive mode. //Implemented by unblocking one or more threads if tryRelease returns true. //This method can be used to implement method Lock#unlock. //@param arg the release argument. This value is conveyed to #ryRelease but is otherwise uninterpreted and can represent anything you like. public final boolean release(int arg) { //执行AQS的子类Sync的tryRelease()方法 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) { //传入头结点 unparkSuccessor(h); } return true; } return false; } //Wakes up node's successor, if one exists. private void unparkSuccessor(Node node) { //If status is negative (i.e., possibly needing signal) try to clear in anticipation of signalling. //It is OK if this fails or if status is changed by waiting thread. //Node结点的状态watiStatus可以分为如下几种: //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3) //默认情况下,watiStatus应该是0,或者是空 //获得头结点的状态 int ws = node.waitStatus; //需要设置头结点的状态为0 if (ws < 0) { compareAndSetWaitStatus(node, ws, 0); } //Thread to unpark is held in successor, which is normally just the next node. //But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor. //获取头结点的后继结点 Node s = node.next; //如果头结点的后继结点为null或其状态为CANCELED if (s == null || s.waitStatus > 0) { s = null; //那么就从尾结点开始扫描,找到距离头结点最近的 + 状态不是取消的结点 for (Node t = tail; t != null && t != node; t = t.prev) { if (t.waitStatus <= 0) { s = t; } } } if (s != null) { //唤醒传入的头结点的后继结点对应的线程 LockSupport.unpark(s.thread); } } ... }
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } ... //Acquires the lock unless the current thread is Thread#interrupt interrupted. public void lockInterruptibly() throws InterruptedException { //执行AQS的acquireInterruptibly()方法 sync.acquireInterruptibly(1); } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Acquires in exclusive mode, aborting if interrupted. //Implemented by first checking interrupt status, then invoking at least once #tryAcquire, returning on success. //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquire until success or the thread is interrupted. //This method can be used to implement method Lock#lockInterruptibly. public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } //首先还是执行AQS子类重写的tryAcquire()方法 if (!tryAcquire(arg)) { //执行AQS的doAcquireInterruptibly()方法 doAcquireInterruptibly(arg); } } //Acquires in exclusive interruptible mode. private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } //执行shouldParkAfterFailedAcquire()方法会设置node结点的前驱结点的状态为SIGNAL //执行parkAndCheckInterrupt()方法挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { //和acquireQueued()方法不同的是,这里收到线程中断信号后,不再去重新竞争锁,而是直接抛异常返回 throw new InterruptedException(); } } } finally { if (failed) { cancelAcquire(node); } } } //Acquires in exclusive uninterruptible mode for thread already in queue. //Used by condition wait methods as well as acquire. final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL //执行parkAndCheckInterrupt()方法挂起当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { interrupted = true; } } } finally { if (failed) { cancelAcquire(node); } } } ... }
(2)ReentractLock的超时获取
在超时时间非常短的情况下,AQS不会挂起线程,而是让线程自旋去获取锁。
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } ... //Acquires the lock if it is not held by another thread within the given waiting time //and the current thread has not been Thread#interrupt interrupted. public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { //执行AQS的tryAcquireNanos()方法 return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Attempts to acquire in exclusive mode, aborting if interrupted, and failing if the given timeout elapses. //Implemented by first checking interrupt status, then invoking at least once #tryAcquire, returning on success. //Otherwise, the thread is queued, possibly repeatedly blocking and unblocking, //invoking #tryAcquire until success or the thread is interrupted or the timeout elapses. //This method can be used to implement method Lock#tryLock(long, TimeUnit). public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException(); } //如果执行AQS子类重写的tryAcquire()方法失败,才执行AQS的doAcquireNanos()方法 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } //Acquires in exclusive timed mode. private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) { return false; } final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) {//已超时 return false; } //在超时时间非常短的情况下,AQS不会挂起线程,而是让线程自旋去获取锁; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) { //最多挂起线程nanosTimeout LockSupport.parkNanos(this, nanosTimeout); } if (Thread.interrupted()) { throw new InterruptedException(); } } } finally { if (failed) { cancelAcquire(node); } } } ... }
public class ReentrantLock implements Lock, java.io.Serializable { ... //Creates an instance of ReentrantLock. //This is equivalent to using ReentrantLock(false). public ReentrantLock() { sync = new NonfairSync(); } //Creates an instance of ReentrantLock with the given fairness policy. //@param fair true if this lock should use a fair ordering policy public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } ... }
public class ReentrantLock implements Lock, java.io.Serializable { ... //Sync object for fair locks,公平锁 static final class FairSync extends Sync { final void lock() { acquire(1); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) { throw new Error("Maximum lock count exceeded"); } setState(nextc); return true; } return false; } } //Sync object for non-fair locks,非公平锁 static final class NonfairSync extends Sync { //Performs lock. Try immediate barge, backing up to normal acquire on failure. final void lock() { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); } else { acquire(1); } } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { //Performs Lock#lock. The main reason for subclassing is to allow fast path for nonfair version. abstract void lock(); //Performs non-fair tryLock. //tryAcquire is implemented in subclasses, but both need nonfair try for trylock method. final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) { throw new Error("Maximum lock count exceeded"); } setState(nextc); return true; } return false; } ... } ... } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { ... //Queries whether any threads have been waiting to acquire longer than the current thread. //判断当前队列中是否有线程排队 public final boolean hasQueuedPredecessors() { Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //所以!hasQueuedPredecessors() 等价于: //h == t || (h.next != null && h.next.thread == Thread.currentThread()) return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } ... }
12.AQS的方法和使用总结
(1)访问或修改state的方法
(2)需要子类重写的方法
(3)AQS提供的模版方法
(4)如何使用AQS自定义独占锁
(5)独占锁的获取和释放总结
(1)访问或修改state的方法
getState():获取同步状态 setState(int newState):设置当前同步状态 compareAndSetState(int expect, int update):使用CAS设置同步状态 setExclusiveOwnerThread(Thread thread):标识加锁的线程