JUC并发—5.AQS源码分析一

大纲

1.JUC中的Lock接口

2.如何实现具有阻塞或唤醒功能的锁

3.AQS抽象队列同步器的理解

4.基于AQS实现的ReentractLock

5.ReentractLock如何获取锁

6.AQS如何基于state变量实现可重入锁

7.AQS如何处理CAS加锁失败的线程

8.AQS的acquire()方法获取锁的流程总结

9.ReentractLock如何释放锁

10.ReentractLock的响应中断和超时获取

11.ReentractLock的公平锁FairSync

12.AQS的方法和使用总结

 

1.JUC中的Lock接口

(1)Lock接口定义了抢占锁和释放锁的方法

(2)Lock接口的实现类ReentrantLock

(3)Lock接口的实现类ReentrantReadWriteLock

(4)Lock接口的实现类StampedLock

(5)三种锁的并发度

 

(1)Lock接口定义了抢占锁和释放锁的方法

一.lock()方法

抢占锁,如果没抢到锁则阻塞。

二.tryLock()方法

尝试抢占锁,成功返回true,失败返回false。

三.unlock()方法

释放锁。

 

(2)Lock接口的实现类ReentrantLock

ReentractLock是重入锁,属于排他锁,功能和synchronized类似。但是在实际中,其实比较少会使用ReentrantLock。因为ReentrantLock的实现及性能和syncrhonized差不多,所以一般推荐使用synchronized而不是ReentrantLock。

public class ReentractLockDemo {     static int data = 0;     static ReentrantLock lock = new ReentrantLock();     public static void main(String[] args) {         new Thread() {             public void run() {                 for (int i = 0; i < 10; i++) {                     lock.lock();//获取锁                     try {                         ReentractLockDemo.data++;                         System.out.println(ReentractLockDemo.data);                     } finally {                         lock.unlock();//释放锁                     }                 }             }         }.start();         new Thread() {             public void run() {                 for (int i = 0; i < 10; i++) {                     lock.lock();//获取锁                     try {                         ReentractLockDemo.data++;                         System.out.println(ReentractLockDemo.data);                     } finally {                         lock.unlock();                     }                 }             }         }.start();     } }

(3)Lock接口的实现类ReentrantReadWriteLock

ReentrantReadWriteLock是可重入的读写锁,ReentrantReadWriteLock维护了两个锁:一是ReadLock,二是WriteLock。ReadLock和WriteLock都分别实现了Lock接口。

 

ReadLock和WriteLock适用于读多写少场景,具有特性:读读不互斥、读写互斥、写写互斥。

public class ReadWriteLockExample {     private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();     private final Lock readLock = readWriteLock.readLock();     private final Lock writeLock = readWriteLock.writeLock();     private List<String> dataList = new ArrayList<>();          public void add(String data) {         writeLock.lock();         try {             dataList.add(data);         } finally {             writeLock.unlock();         }     }          public String get(int idx) {         readLock.lock();         try {             return dataList.get(idx);         } finally {             readLock.unlock();         }     } }

(4)Lock接口的实现类StampedLock

ReentrantReadWriteLock锁有一个问题,如果当前有线程在调用get()方法,那么所有调用add()方法的线程必须要等调用get()方法的线程释放锁才能写,所以如果调用get()方法的线程非常多,那么就会导致写线程一直被阻塞。

 

StampedLock优化了ReentrantReadWriteLock的乐观锁,当有线程调用get()方法读取数据时,不会阻塞准备执行写操作的线程。

 

StampedLock提供了三种锁;

一.writeLock

功能与ReentrantReadWriteLock的写锁一样。

二.readLock

功能与ReentrantReadWriteLock的读锁一样。

三.tryOptimisticRead

当有线程获得该读锁时不会阻塞其他线程的写操作。

 

StampedLock的tryOptimisticRead()方法会返回一个stamp版本号,用来表示当前线程在读操作期间数据是否被修改过。

 

StampedLock提供了一个validate()方法来验证stamp版本号,如果线程在读取过程中没有其他线程对数据做修改,那么stamp的值不会变。

 

StampedLock使用了乐观锁的思想,避免了在读多写少场景中,大量线程占用读锁造成的写阻塞,在一定程度上提升了读写锁的并发性能。

public class Point {     private double x, y;     private final StampedLock stampedLock = new StampedLock();      public void move(double deltaX, double deltaY) {         //获得一个写锁,和ReentrantReadWriteLock相同         long stamp = stampedLock.writeLock();         try {             x += deltaX;             y += deltaY;         } finally {             stampedLock.unlock(stamp);         }     }      public double distanceFromOrigin() {         //获得一个乐观锁,不阻塞写操作         long stamp = stampedLock.tryOptimisticRead();         double currentX = x, currentY = y;         if (stampedLock.validate(stamp)) {             stamp = stampedLock.readLock();             try {                 currentX = x;                 currentY = y;             } finally {                 stampedLock.unlock(stamp);             }         } else {             //可以使用readLock()方法来获取带阻塞机制的读锁,类似于synchronized中的锁升级         }         return Math.sqrt(currentX * currentX + currentY * currentY);     } }

(5)三种锁的并发度

一.ReentrantLock

读读互斥、读写互斥、写写互斥。

二.ReentrantReadWriteLock

读读不互斥、读写互斥、写写互斥。

三.StampedLock

读读不互斥、读写不互斥、写写互斥。

 

ReentrantReadWriteLock采用的是悲观读策略。当第一个读线程获取锁后,第二个、第三个读线程还可以获取锁。这样可能会使得写线程一直拿不到锁,从而导致写线程饿死。所以其公平和非公平实现中,都会尽量避免这种情形。比如非公平锁的实现中,如果读线程在尝试获取锁时发现,AQS的等待队列中的头结点的后继结点是独占锁结点,那么读线程会阻塞。

 

StampedLock采用的是乐观读策略,类似于MVCC。读的时候不加锁,读出来发现数据被修改了,再升级为悲观锁。

 

2.如何实现具有阻塞或唤醒功能的锁

(1)需要一个共享变量标记锁的状态

(2)需要记录当前是哪个线程持有锁

(3)需要支持对线程进行阻塞和唤醒

(4)需要一个无锁队列来维护阻塞线程

 

(1)需要一个共享变量标记锁的状态

AQS有一个int变量state用来记录锁的状态,通过CAS操作确保对state变量操作的线程安全。

 

(2)需要记录当前是哪个线程持有锁

AQS有一个Thread变量exclusiveOwnerThread用来记录持有锁的线程。

 

当state = 0时,没有线程持有锁,此时exclusiveOwnerThread = null。

当state = 1时,有一线程持有锁,此时exclusiveOwnerThread = 该线程。

当state > 1,说明有一线程重入了锁。

 

(3)需要支持对线程进行阻塞和唤醒

AQS使用LockSupport工具类的park()方法和unpark()方法,通过Unsafe类提供的native方法实现阻塞和唤醒线程。

 

(4)需要一个无锁队列来维护阻塞线程

AQS通过一个双向链表和CAS实现了一个无锁的阻塞队列来维护阻塞的线程。

 

3.AQS抽象队列同步器的理解

(1)什么是AQS

AQS就是AbstractQueuedSynchronizer的缩写,中文意思就是抽象队列同步器。

 

其中ReentractLock、ReadWriteReentractLock,都是基于AQS来实现的。AQS是专门用来支撑各种Java并发类底层实现的抽象类。

 

AQS中最关键的两部分是:Node等待队列和state变量。其中Node等待队列是一个双向链表,用来存放阻塞等待的线程,而state变量则用来在加锁和释放锁时标记锁的状态。

JUC并发—5.AQS源码分析一

(2)如何理解AQS

AQS是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过一个无锁队列实现线程排队。

 

AQS的主要使用方式是继承,子类通过继承AQS并实现AQS的抽象方法来管理同步状态,子类被推荐定义为自定义同步组件的静态内部类。

 

AQS自身没有实现任何接口,AQS仅仅定义若干同步状态获取和释放的方法来供同步组件使用。AQS既可以支持独占式获取同步状态,也可以支持共享式获取同步状态。

 

AQS面向的是锁或者同步组件的实现者,基于模版方法设计模式,简化了锁的实现,屏蔽了同步状态管理、线程排队、线程等待与唤醒等操作。

 

4.基于AQS实现的ReentractLock

(1)ReentrantLock的构造函数

(2)ReentrantLock的加锁方法

(3)ReentrantLock的Sync对象

 

(1)ReentrantLock的构造函数

在ReentrantLock的构造函数中,初始化sync变量为一个NonfairSync对象。NonfairSync是非公平锁,所以ReentrantLock默认使用非公平锁。

public class ReentrantLock implements Lock, java.io.Serializable {     ...     //Synchronizer providing all implementation mechanics     private final Sync sync;          //Creates an instance of ReentrantLock.     //This is equivalent to using ReentrantLock(false).     public ReentrantLock() {         sync = new NonfairSync();     }          //Creates an instance of ReentrantLock with the given fairness policy.     public ReentrantLock(boolean fair) {         sync = fair ? new FairSync() : new NonfairSync();     }     ... }

(2)ReentrantLock的加锁方法

ReentrantLock获取锁是通过Sync的lock()方法来实现的,也就是ReentrantLock的lock()方法默认会执行继承自ReentrantLock内部类Sync的NonfairSync类的lock()方法。

public class ReentrantLock implements Lock, java.io.Serializable {     ...     //Acquires the lock.     //Acquires the lock if it is not held by another thread and returns immediately,      //setting the lock hold count to one.          //If the current thread already holds the lock,     //then the hold count is incremented by one and the method returns immediately.         //If the lock is held by another thread,     //then the current thread becomes disabled for thread scheduling purposes      //and lies dormant until the lock has been acquired, at which time the lock hold count is set to one.     public void lock() {         //执行继承自ReentrantLock内部类Sync的NonfairSync类或FairSync类的lock()方法         sync.lock();     }      //Acquires the lock unless the current thread is Thread#interrupt interrupted.     public void lockInterruptibly() throws InterruptedException {         sync.acquireInterruptibly(1);     }      //Acquires the lock only if it is not held by another thread at the time of invocation.     public boolean tryLock() {         return sync.nonfairTryAcquire(1);     }      //Acquires the lock if it is not held by another thread within the given waiting time      //and the current thread has not been Thread#interrupt interrupted.     public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {         return sync.tryAcquireNanos(1, unit.toNanos(timeout));     }      //Attempts to release this lock.     public void unlock() {         sync.release(1);     }     ... }

(3)ReentrantLock的sync变量

ReentrantLock的sync变量就是一个Sync对象,Sync类则是ReentrantLock的抽象静态内部类。Sync类继承了AQS类(抽象队列同步器),所以可以认为Sync就是AQS。此外,NonfairSync是Sync的一个子类,FairSync也是Sync的一个子类。

public class ReentrantLock implements Lock, java.io.Serializable {     ...     //Synchronizer providing all implementation mechanics     private final Sync sync;          //Base of synchronization control for this lock.      //Subclassed into fair and nonfair versions below.      //Uses AQS state to represent the number of holds on the lock.     abstract static class Sync extends AbstractQueuedSynchronizer {         ...     }          //Sync object for fair locks     static final class FairSync extends Sync {         ...     }          //Sync object for non-fair locks     static final class NonfairSync extends Sync {         ...     }     ... }  public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     ...     //Head of the wait queue, lazily initialized.     //Except for initialization, it is modified only via method setHead.       //Note: If head exists, its waitStatus is guaranteed not to be CANCELLED.     private transient volatile Node head;      //Tail of the wait queue, lazily initialized.       //Modified only via method enq to add new wait node.     private transient volatile Node tail;      //The synchronization state.     private volatile int state;     ... }

 

5.ReentractLock如何获取锁

(1)compareAndSetState()方法尝试加锁

(2)setExclusiveOwnerThread()方法设置加锁线程为当前线程

 

(1)compareAndSetState()方法尝试加锁

ReentractLock是基于NonfairSync的lock()方法来实现加锁的。AQS里有一个核心的变量state,代表了锁的状态。在NonfairSync.lock()方法中,会通过CAS操作来设置state从0变为1。如果state原来是0,那么就代表此时还没有线程获取锁,当前线程执行AQS的compareAndSetState()方法便能成功将state设置为1。

 

所以AQS的compareAndSetState()方法相当于在尝试加锁。AQS的compareAndSetState()方法是基于Unsafe类来实现CAS操作的,Atomic原子类也是基于Unsafe来实现CAS操作的。

 

(2)setExclusiveOwnerThread()方法设置加锁线程为当前线程

如果执行compareAndSetState(0, 1)返回的是true,那么就说明加锁成功,于是执行AQS的setExclusiveOwnerThread()方法设置加锁线程为当前线程。

public class ReentrantLock implements Lock, java.io.Serializable {     //Synchronizer providing all implementation mechanics     private final Sync sync;     ...          //If the lock is held by another thread,     //then the current thread becomes disabled for thread scheduling purposes      //and lies dormant until the lock has been acquired, at which time the lock hold count is set to one.     public void lock() {         //执行继承自ReentrantLock内部类Sync的NonfairSync类或FairSync类的lock()方法         sync.lock();     }      //Sync object for non-fair locks     static final class NonfairSync extends Sync {         //Performs lock. Try immediate barge, backing up to normal acquire on failure.         final void lock() {             //执行AQS的compareAndSetState()方法             if (compareAndSetState(0, 1)) {                 //执行AQS的setExclusiveOwnerThread()方法                 setExclusiveOwnerThread(Thread.currentThread());             } else {                 //执行AQS的acquire()方法                 acquire(1);             }         }         ...     }     ... }  public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     private static final Unsafe unsafe = Unsafe.getUnsafe();     private static final long stateOffset;     private volatile int state;      static {         try {             stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));             ...         } catch (Exception ex) {             throw new Error(ex);         }     }      //Atomically sets synchronization state to the given updated value if the current state value equals the expected value.     //This operation has memory semantics of a volatile read and write.     protected final boolean compareAndSetState(int expect, int update) {         //See below for intrinsics setup to support this         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);     }     ... }  public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {     ...     //The current owner of exclusive mode synchronization.     private transient Thread exclusiveOwnerThread;          //Sets the thread that currently owns exclusive access.     //A null argument indicates that no thread owns access.     //This method does not otherwise impose any synchronization or volatile field accesses.     protected final void setExclusiveOwnerThread(Thread thread) {         exclusiveOwnerThread = thread;     }     ... }

 

6.AQS如何基于state变量实现可重入锁

(1)线程重入锁时CAS操作失败

(2)Sync的nonfairTryAcquire()实现可重入锁

 

(1)线程重入锁时CAS操作失败

假如线程1加锁后又调用ReentrantLock.lock()方法,应如何实现可重入锁?此时state = 1,故执行AQS的compareAndSetState(0, 1)方法会返回false。所以首先通过CAS操作,尝试获取锁会失败,然后返回false,于是便会执行AQS的acquire(1)方法。

 

(2)Sync的nonfairTryAcquire()实现可重入锁

在AQS的acquire()方法中,首先会执行AQS的tryAcquire()方法尝试获取锁。但AQS的tryAcquire()方法是个保护方法,需要由子类重写。所以其实会执行继承自AQS子类Sync的NonfairSync的tryAcquire()方法,而该方法最终又执行回AQS子类Sync的nonfairTryAcquire()方法。

 

在AQS子类Sync的nonfairTryAcquire()方法中:首先判断state是否为0,如果是则表明此时已释放锁,可通过CAS来获取锁。否则判断持有锁的线程是否为当前线程,如果是则对state进行累加。也就是通过对state进行累加,实现持有锁的线程可以重入锁。

public class ReentrantLock implements Lock, java.io.Serializable {     ...     //Synchronizer providing all implementation mechanics     private final Sync sync;      public void lock() {         //执行继承自ReentrantLock内部类Sync的NonfairSync类或FairSync类的lock()方法         sync.lock();     }      //NonfairSync是ReentractLock的内部类,继承自ReentractLock的另一内部类Sync     static final class NonfairSync extends Sync {         final void lock() {             //执行AQS的compareAndSetState()方法             if (compareAndSetState(0, 1)) {                 //执行AQS的setExclusiveOwnerThread()方法                 setExclusiveOwnerThread(Thread.currentThread());             } else {                 //执行AQS的acquire()方法                 acquire(1);             }         }         //判断是否是重入锁 + 是否已释放锁         protected final boolean tryAcquire(int acquires) {             //执行继承自AQS的Sync的nonfairTryAcquire()方法             return nonfairTryAcquire(acquires);         }     }      abstract static class Sync extends AbstractQueuedSynchronizer {         ...         //判断是否是重入锁 + 是否已释放锁,尝试获取锁         final boolean nonfairTryAcquire(int acquires) {             //先获取当前的线程             final Thread current = Thread.currentThread();             //获取volatile修饰的state变量值             int c = getState();             //有可能在获取当前线程和state变量值的时候,持有锁的线程释放了锁             //所以需要再次判断一下state是否为0,如果state是0,那么再次尝试加锁             if (c == 0) {//表示无锁状态                 //执行AQS的compareAndSetState()方法,CAS设置state成功表明当前线程获取到锁                 if (compareAndSetState(0, acquires)) {                     //执行AQS的setExclusiveOwnerThread()方法,保存获得锁的线程为当前线程                     setExclusiveOwnerThread(current);                     return true;                 }             }             //代码执行到这里还是没有线程释放锁,state还是 != 0             //所以先通过AQS的getExclusiveOwnerThread()方法获取当前线程             //然后再判断持有锁的线程是否为当前线程,即当前线程 == exclusiveOwnerThread             //如果持有锁的线程是当前线程,则代表当前线程在重复加锁,所以增加重入次数             else if (current == getExclusiveOwnerThread()) {                 //此时,c = 1,而nextc = c(1) + acquires(1) = 2                 //这代表的是当前线程重入锁2次,nextc代表重入次数                 int nextc = c + acquires;                 if (nextc < 0) {                     throw new Error("Maximum lock count exceeded");                 }                 //执行AQS的setState()方法,修改volatile修饰的state                 setState(nextc);                 return true;             }             return false;         }         ...     }             ... }  public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     ...     //The synchronization state.     private volatile int state;      //Returns the current value of synchronization state.     //This operation has memory semantics of a volatile read.     //@return current state value     protected final int getState() {         return state;     }          //Sets the value of synchronization state.     //This operation has memory semantics of a volatile write.     //@param newState the new state value     protected final void setState(int newState) {         state = newState;     }      //Acquires in exclusive mode, ignoring interrupts.      //Implemented by invoking at least once tryAcquire, returning on success.     //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success.       //This method can be used to implement method Lock#lock.     public final void acquire(int arg) {         //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁         if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {             selfInterrupt();         }     }          //Attempts to acquire in exclusive mode.      //This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it.     //This method is always invoked by the thread performing acquire.     //If this method reports failure, the acquire method may queue the thread, if it is not already queued,      //until it is signalled by a release from some other thread.      //This can be used to implement method Lock#tryLock().     //The default implementation throws UnsupportedOperationException.     protected boolean tryAcquire(int arg) {         throw new UnsupportedOperationException();     }     ... }  public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {     ...     //The current owner of exclusive mode synchronization.     private transient Thread exclusiveOwnerThread;          //Sets the thread that currently owns exclusive access.     //A null argument indicates that no thread owns access.     //This method does not otherwise impose any synchronization or volatile field accesses.     protected final void setExclusiveOwnerThread(Thread thread) {         exclusiveOwnerThread = thread;     }      //Returns the thread last set by setExclusiveOwnerThread, or null if never set.     //This method does not otherwise impose any synchronization or volatile field accesses.     protected final Thread getExclusiveOwnerThread() {         return exclusiveOwnerThread;     }     ... }

 

7.AQS如何处理CAS加锁失败的线程

(1)加锁失败的线程的核心处理逻辑

(2)加锁失败的线程的具体处理流程

(3)执行AQS的addWaiter()方法维护等待队列

(4)执行AQS的acquireQueued()方法挂起线程

(5)如何处理正常唤醒和中断唤醒

 

(1)加锁失败的线程的核心处理逻辑

加锁失败时会将获取锁失败的线程维护成一个双向链表,也就是队列。同时会将线程挂起进行阻塞等待,等待被持有锁的线程释放锁时唤醒。

JUC并发—5.AQS源码分析一

(2)加锁失败的线程的具体处理流程

首先在ReentrantLock内部类NonfairSync的lock()方法中,执行AQS的compareAndSetState()方法尝试获取锁是失败的。

 

于是执行AQS的acquire()方法 -> 执行NonfairSync的tryAcquire()方法。也就是执行继承自AQS的Sync的nonfairTryAcquire()方法,进行判断是否是重入锁 + 是否已释放锁。发现也是失败的,所以继承自Sync的NonfairSync的tryAcquire()方法返回false。

 

然后在AQS的acquire()方法中,if判断的第一个条件tryAcquire()便是false,所以接着会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。也就是先执行AQS的addWaiter()方法将当前线程加入等待队列,然后再去执行AQS的acquireQueued()方法将当前线程挂起阻塞等待。

public class ReentrantLock implements Lock, java.io.Serializable {     ...     //Synchronizer providing all implementation mechanics     private final Sync sync;      public void lock() {         //执行继承ReentrantLock内部类Sync的NonfairSync类的lock()方法         sync.lock();     }     ... }  public class ReentrantLock implements Lock, java.io.Serializable {     ...     //这是ReentractLock里的内部类     static final class NonfairSync extends Sync {         final void lock() {             //执行AQS的compareAndSetState()方法             if (compareAndSetState(0, 1)) {                 //执行AQS的setExclusiveOwnerThread()方法                 setExclusiveOwnerThread(Thread.currentThread());             } else {                 //通过CAS获取锁失败时,执行AQS的acquire()方法                 acquire(1);             }         }                //判断是否是重入锁 + 是否已释放锁         protected final boolean tryAcquire(int acquires) {             //执行继承自AQS的Sync的nonfairTryAcquire()方法             return nonfairTryAcquire(acquires);         }     }      abstract static class Sync extends AbstractQueuedSynchronizer {         ...         //判断是否是重入锁 + 是否已释放锁,尝试获取锁         final boolean nonfairTryAcquire(int acquires) {             //先获取当前的线程             final Thread current = Thread.currentThread();             //获取volatile修饰的state变量值             int c = getState();             //有可能在获取当前线程和state变量值的时候,持有锁的线程释放了锁             //所以需要再次判断一下state是否为0,如果state是0,那么再次尝试加锁             if (c == 0) {//表示无锁状态                 //执行AQS的compareAndSetState()方法,CAS设置state成功表明当前线程获取到锁                 if (compareAndSetState(0, acquires)) {                     //执行AQS的setExclusiveOwnerThread()方法,保存获得锁的线程为当前线程                     setExclusiveOwnerThread(current);                     return true;                 }             }             //代码执行到这里还是没有线程释放锁,state还是 != 0             //所以先通过AQS的getExclusiveOwnerThread()方法获取当前线程             //然后再判断持有锁的线程是否为当前线程,即当前线程 == exclusiveOwnerThread             //如果持有锁的线程是当前线程,则代表当前线程在重复加锁,所以增加重入次数             else if (current == getExclusiveOwnerThread()) {                 //此时,c = 1,而nextc = c(1) + acquires(1) = 2                 //这代表的是当前线程重入锁2次,nextc代表重入次数                 int nextc = c + acquires;                 if (nextc < 0) {                     throw new Error("Maximum lock count exceeded");                 }                 //修改这个volatile修饰的state,volatile保证了可见性                 setState(nextc);                 return true;             }             return false;         }         ...     }             ... }  public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     ...     //Acquires in exclusive mode, ignoring interrupts.      //Implemented by invoking at least once tryAcquire, returning on success.     //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success.       //This method can be used to implement method Lock#lock.     public final void acquire(int arg) {         //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁         //当前线程加锁失败时,执行NonfairSync的tryAcquire()方法会返回false,即执行ReentrantLock内部类Sync的nonfairTryAcquire()方法返回false         //所以当前线程加锁失败时会先执行AQS的addWaiter()方法,再执行AQS的acquireQueued()方法         if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {             selfInterrupt();         }     }          //Attempts to acquire in exclusive mode.      //This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it.     //This method is always invoked by the thread performing acquire.     //If this method reports failure, the acquire method may queue the thread, if it is not already queued,      //until it is signalled by a release from some other thread.      //This can be used to implement method Lock#tryLock().     //The default implementation throws UnsupportedOperationException.     protected boolean tryAcquire(int arg) {         throw new UnsupportedOperationException();     }     ... }

ReentrantLock内部类NonfairSync的lock()方法总结:如果CAS操作失败,则说明有线程正在持有锁,此时会继续调用acquire(1)。然后通过NonfairSync的tryAcquire()方法尝试获取独占锁,也就是通过Sync的nonfairTryAcquire()方法尝试获取独占锁。如果NonfairSync的tryAcquire()方法返回false,说明锁已被占用。于是执行AQS的addWaiter()方法将当前线程封装成Node并添加到等待队列,接着执行AQS的acquireQueued()方法通过自旋尝试获取锁以及挂起线程。

 

(3)执行AQS的addWaiter()方法维护等待队列

在AQS的addWaiter()方法中:首先会将当前获取锁失败的线程封装为一个Node对象,然后判断等待队列(双向链表)的尾结点是否为空。如果尾结点不为空,则使用CAS操作 + 尾插法将Node对象插入等待队列中。如果尾结点为空或者尾结点不为空时CAS操作失败,则调用enq()方法通过自旋 + CAS构建等待队列或把Node对象插入等待队列。

 

注意:等待队列的队头是个空的Node结点,新增一个结点时会从尾部插入。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     private transient volatile Node head;     private transient volatile Node tail;     private static final long headOffset;     private static final long tailOffset;          static {         try {             headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));             tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));             ...         } catch (Exception ex) {             throw new Error(ex);         }     }          ...     //Acquires in exclusive mode, ignoring interrupts.      //Implemented by invoking at least once tryAcquire, returning on success.     //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success.       //This method can be used to implement method Lock#lock.     public final void acquire(int arg) {         //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁         //当前线程加锁失败时,执行NonfairSync的tryAcquire()方法会返回false,即执行ReentrantLock内部类Sync的nonfairTryAcquire()方法返回false         //所以当前线程加锁失败时会先执行AQS的addWaiter()方法,再执行AQS的acquireQueued()方法         if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {             selfInterrupt();         }     }          //Attempts to acquire in exclusive mode.      //This method should query if the state of the object permits it to be acquired in the exclusive mode, and if so to acquire it.     //This method is always invoked by the thread performing acquire.     //If this method reports failure, the acquire method may queue the thread, if it is not already queued,      //until it is signalled by a release from some other thread.      //This can be used to implement method Lock#tryLock().     //The default implementation throws UnsupportedOperationException.     protected boolean tryAcquire(int arg) {         throw new UnsupportedOperationException();     }          //Creates and enqueues node for current thread and given mode.     //@param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared     //@return the new node     private Node addWaiter(Node mode) {         //首先将获取锁失败的线程封装为一个Node对象;         //然后使用尾插法将这个Node对象插入双向链表中,形成一个等待队列;         Node node = new Node(Thread.currentThread(), mode);         Node pred = tail;//tail是AQS等待队列的尾结点         if (pred != null) {//当tail不为空时             node.prev = pred;//把根据当前线程封装的node结点的prev指向tail             if (compareAndSetTail(pred, node)) {//通过CAS把node结点插入等待队列尾部                 pred.next = node;//把原tail的next指向node结点                 return node;             }         }         //当tail为空时,把node结点添加到等待队列         enq(node);         return node;     }          private Node enq(final Node node) {         for (;;) {             //首先获取等待队列的队尾结点             Node t = tail;             //队列的头是个空Node,新增一个结点会从尾部插入             if (t == null) {                 //初始化一个空的Node结点并赋值给head和tail                 if (compareAndSetHead(new Node())) {                     tail = head;                 }             } else {                 node.prev = t;                 //尝试比较tail变量是否为t,如果为t的话,那么tail指针就指向node                 if (compareAndSetTail(t, node)) {                     t.next = node;                     return t;                 }             }         }     }          private final boolean compareAndSetHead(Node update) {         //headOffset是head变量在AQS类的位置,判断head变量是否为null         //如果是null就将head设置为空的Node结点         //所以队列的头是个空Node,新增一个结点会从尾部插入         return unsafe.compareAndSwapObject(this, headOffset, null, update);     }          private final boolean compareAndSetTail(Node expect, Node update) {         return unsafe.compareAndSwapObject(this, tailOffset, expect, update);     }      static final class Node {         //如果当前线程获取锁失败,则会进入阻塞等待的状态,当前线程会被挂起;         //阻塞状态可以分为很多种不同的阻塞状态:         //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)         volatile int waitStatus;         //一个结点的上一个结点,prev指针,指向Node结点的上一个Node         volatile Node prev;         //一个结点的下一个结点,next指针,指向Node结点的下一个Node         volatile Node next;         //Node结点里封装的一个线程         volatile Thread thread;         //可以认为是下一个等待线程         Node nextWaiter;      }     ... }

(4)执行AQS的acquireQueued()方法挂起线程

执行完AQS的addWaiter()方法后便执行AQS的acquireQueued()方法。

 

在AQS的acquireQueued()方法中:首先会判断传入结点的上一个结点是否为等待队列的头结点。如果是,则再次调用NonfairSync的tryAcquire()方法尝试获取锁。如果获取锁成功,则将传入的Node结点从等待队列中移除。同时设置传入的Node结点为头结点,然后将该结点设置为空。从而确保等待队列的头结点是一个空的Node结点。

 

注意:NonfairSync的tryAcquire()方法会判断是否重入锁 + 是否已释放锁。

 

在AQS的acquireQueued()方法中:如果首先进行的尝试获取锁失败了,那么就执行shouldParkAfterFailedAcquire()方法判断是否要将当前线程挂起。如果需要将当前线程挂起,则会调用parkAndCheckInterrupt()方法进行挂起,也就是通过调用LockSupport的park()方法挂起当前线程。

 

需要注意的是:如果线程被中断,只会暂时设置interrupted为true。然后还要继续等待被唤醒获取锁,才能调用selfInterrupt()方法对线程中断。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     ...     //Acquires in exclusive mode, ignoring interrupts.      //Implemented by invoking at least once tryAcquire, returning on success.     //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking tryAcquire until success.       //This method can be used to implement method Lock#lock.     public final void acquire(int arg) {         //首先执行继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,用来判断是否是重入锁+是否已释放锁,从而尝试获取锁         //当前线程加锁失败时,执行NonfairSync的tryAcquire()方法会返回false,即执行ReentrantLock内部类Sync的nonfairTryAcquire()方法返回false         //所以当前线程加锁失败时会先执行AQS的addWaiter()方法,再执行AQS的acquireQueued()方法         if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {             //对当前线程进行中断             selfInterrupt();         }     }          //Convenience method to interrupt current thread.     static void selfInterrupt() {         Thread.currentThread().interrupt();     }          //Acquires in exclusive uninterruptible mode for thread already in queue.      //Used by condition wait methods as well as acquire.     //@param node the node     //@param arg the acquire argument     //@return true if interrupted while waiting     final boolean acquireQueued(final Node node, int arg) {         boolean failed = true;         try {             boolean interrupted = false;             for (;;) {                 //获取node结点的上一个结点,也就是prev指针指向的结点                 final Node p = node.predecessor();                 //这里会先判断传入结点的上一个结点是否为等待队列的头结点                 //如果是,则再次调用继承自ReentrantLock内部类Sync的NonfairSync的tryAcquire()方法,尝试获取锁                 //如果获取锁成功,那么就将当前线程对应的Node结点从等待队列中移除                 if (p == head && tryAcquire(arg)) {                     //重新设置传入的Node结点为头结点,同时将该结点设置为空                     setHead(node);//线程唤醒后,头结点会后移                     p.next = null;//help GC                     failed = false;                     return interrupted;                 }                 //如果再次尝试获取锁失败,则执行shouldParkAfterFailedAcquire()方法                 //判断是否需要将当前线程挂起,然后进行阻塞等待                 //如果需要挂起,那么就会使用park操作挂起当前线程                 //执行shouldParkAfterFailedAcquire()方法会设置node结点的前驱结点的状态为SIGNAL                 //执行parkAndCheckInterrupt()方法挂起当前线程                 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {                     //如果线程被中断,只会暂时设置interrupted为true,还需要继续等待被唤醒来获取锁才能真正中断                     interrupted = true;                 }             }         } finally {             if (failed) {                 cancelAcquire(node);             }         }     }          //Sets head of queue to be node, thus dequeuing.      //Called only by acquire methods.       //Also nulls out unused fields for sake of GC and to suppress unnecessary signals and traversals.     private void setHead(Node node) {         //重新设置传入的Node结点为头结点,同时将该结点设置为空         head = node;         node.thread = null;         node.prev = null;     }          //Checks and updates status for a node that failed to acquire.     //Returns true if thread should block.      //This is the main signal control in all acquire loops.     //Requires that pred == node.prev.     //@param pred node's predecessor holding status     //@param node the node     //@return true if thread should block     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {         //pred -> 空Node         //Node结点的状态watiStatus可以分为如下几种:         //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)         //默认情况下,watiStatus应该是0,或者是空         int ws = pred.waitStatus;         //如果前驱结点的状态为SIGNAL,那么当前线程就需要被挂起进行阻塞         if (ws == Node.SIGNAL) {             return true;         }         //如果前驱结点的状态为CANCELED,那么需要移除前驱结点         if (ws > 0) {             do {                 node.prev = pred = pred.prev;             } while (pred.waitStatus > 0);             pred.next = node;         } else {             //修改前驱结点的状态为SIGNAL             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);         }         return false;     }      private final boolean parkAndCheckInterrupt() {         //LockSupport的park操作,就是将一个线程进行挂起         //必须得有另外一个线程来对当前线程执行unpark操作,才能唤醒挂起的线程         LockSupport.park(this);         return Thread.interrupted();     }     ... }

AQS的acquireQueued()方法总结:

如果当前结点的前驱结点不是队头结点或者当前线程尝试抢占锁失败,那么都会调用shouldParkAfterFailedAcquire()方法,修改当前线程结点的前驱结点的状态为SIGNAL + 决定是否应挂起当前线程。shouldParkAfterFailedAcquire()方法作用是检查当前结点的前驱结点状态。如果状态是SIGNAL,则可以挂起线程。如果状态是CANCELED,则要移除该前驱结点。如果状态是其他,则通过CAS操作修改该前驱结点的状态为SIGNAL。

 

(5)如何处理正常唤醒和中断唤醒

LockSupport的park操作,会挂起一个线程。LockSupport的unpark操作,会唤醒被挂起的线程。下面是挂起一个线程和唤醒一个线程的demo:

public static void main(String[] args) throws Exception {     final Thread thread1 = new Thread() {         public void run() {             System.out.println("挂起之前执行的动作");             LockSupport.park();             System.out.println("唤醒之前挂起的线程继续执行");         }     };     thread1.start();      Thread thread2 = new Thread() {         public void run() {             for (int i = 0; i < 10; i++) {                 try {                     System.out.println("等待"+i+"秒");                     Thread.sleep(1000);                 } catch (Exception e) {                     e.printStackTrace();                 }             }             System.out.println("尝试唤醒第一个线程");             LockSupport.unpark(thread1);         }     };     thread2.start(); }

被LockSupport.park()方法阻塞的线程被其他线程唤醒有两种情况:

情况一:其他线程调用了LockSupport.unpark()方法,正常唤醒。

情况二:其他线程调用了阻塞线程Thread的interrupt()方法,中断唤醒。

 

正是因为被LockSupport.park()方法阻塞的线程可能会被中断唤醒,所以AQS的acquireQueued()方法才写了一个for自旋。当阻塞的线程被唤醒后,如果发现自己的前驱结点是头结点,那么就去获取锁。如果获取不到锁,那么就再次阻塞自己,不断重复直到获取到锁为止。

 

被LockSupport.park()方法阻塞的线程不管是正常唤醒还是被中断唤醒,唤醒后都会通过Thread.interruptrd()方法来判断是否是中断唤醒。如果是中断唤醒,唤醒后不会立刻响应中断,而是再次获取锁,获取到锁后才能响应中断。

 

8.AQS的acquire()方法获取锁的流程总结

AQS的acquire()方法代码如下:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     ...     public final void acquire(int arg) {         //首先调用AQS子类重写的tryAcquire()方法,尝试加锁         //如果加锁失败,则调用AQS的addWaiter()方法将当前线程封装成Node结点,插入到等待队列尾部         //接着调用AQS的acquireQueued()方法,通过LockSupport的park操作挂起当前线程,让当前线程阻塞等待         if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {             selfInterrupt();         }     }     ... }

首先调用AQS子类的tryAcquire()方法尝试获取锁(是否重入 + 是否释放锁)。如果获取成功,则说明是重入锁或CAS抢占释放的锁成功,于是退出返回。如果获取失败,则调用AQS的addWaiter()方法将当前线程封装成Node结点,并通过AQS的compareAndSetTail()方法将该Node结点添加到等待队列尾部。

 

然后将该Node结点传入AQS的acquireQueued()方法,通过自旋尝试获取锁。在AQS的acquireQueued()方法中,会判断该Node结点的前驱是否为头结点。如果不是,则挂起当前线程进行阻塞。如果是,则尝试获取锁。如果获取成功,则设置当前结点为头结点,然后退出返回。如果获取失败,则继续挂起当前线程进行阻塞。

 

当被阻塞线程,被其他线程中断唤醒或其对应结点的前驱结点释放了锁,那么就继续判断该线程对应结点的前驱结点是否成为头结点。

JUC并发—5.AQS源码分析一

 

9.ReentractLock如何释放锁

(1)ReentrantLock释放锁的流程

(2)AQS的unparkSuccessor()方法

(3)AQS的release()方法总结

 

(1)ReentrantLock释放锁的流程

ReentrantLock释放锁是通过AQS的release()方法来实现的。在AQS的release()方法中,首先会执行Sync的tryRelease()方法,而Sync的tryRelease()方法会通过递减state变量的值来释放锁资源。如果Sync的tryRelease()方法返回true,也就是成功释放了锁资源,那么接下来就会调用AQS的unparkSuccessor()方法唤醒头结点的后继结点所对应的线程。

public class ReentrantLock implements Lock, java.io.Serializable {     ...     //Synchronizer providing all implementation mechanics     private final Sync sync;      //Attempts to release this lock.     //If the current thread is the holder of this lock then the hold count is decremented.      //If the hold count is now zero then the lock is released.      //If the current thread is not the holder of this lock then IllegalMonitorStateException is thrown.     public void unlock() {         //执行AQS的release()方法         sync.release(1);     }     ...          abstract static class Sync extends AbstractQueuedSynchronizer {         ...         protected final boolean tryRelease(int releases) {             //重入锁的情况:getState() == 2,releases == 1,getState() - releases = 1,c = 1             int c = getState() - releases;             //如果当前线程不等于加锁的线程,说明不是当前线程加的锁,结果当前线程来释放锁,会抛异常             if (Thread.currentThread() != getExclusiveOwnerThread()) {                 throw new IllegalMonitorStateException();             }             boolean free = false;             if (c == 0) {                 free = true;                 setExclusiveOwnerThread(null);             }             setState(c);             return free;         }         ...     } }  public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     ...     //Releases in exclusive mode.     //Implemented by unblocking one or more threads if tryRelease returns true.     //This method can be used to implement method Lock#unlock.     //@param arg the release argument.  This value is conveyed to #ryRelease but is otherwise uninterpreted and can represent anything you like.     public final boolean release(int arg) {         //执行AQS的子类Sync的tryRelease()方法         if (tryRelease(arg)) {             Node h = head;             if (h != null && h.waitStatus != 0) {                 //传入头结点                 unparkSuccessor(h);             }             return true;         }         return false;     }      //Wakes up node's successor, if one exists.     private void unparkSuccessor(Node node) {         //If status is negative (i.e., possibly needing signal) try to clear in anticipation of signalling.           //It is OK if this fails or if status is changed by waiting thread.         //Node结点的状态watiStatus可以分为如下几种:         //默认(0)、CANCELED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)         //默认情况下,watiStatus应该是0,或者是空         //获得头结点的状态         int ws = node.waitStatus;         //需要设置头结点的状态为0         if (ws < 0) {             compareAndSetWaitStatus(node, ws, 0);         }          //Thread to unpark is held in successor, which is normally just the next node.           //But if cancelled or apparently null, traverse backwards from tail to find the actual non-cancelled successor.         //获取头结点的后继结点         Node s = node.next;         //如果头结点的后继结点为null或其状态为CANCELED         if (s == null || s.waitStatus > 0) {             s = null;             //那么就从尾结点开始扫描,找到距离头结点最近的 + 状态不是取消的结点             for (Node t = tail; t != null && t != node; t = t.prev) {                 if (t.waitStatus <= 0) {                     s = t;                 }             }         }         if (s != null) {             //唤醒传入的头结点的后继结点对应的线程             LockSupport.unpark(s.thread);         }     }     ... }

(2)AQS的unparkSuccessor()方法

该方法的主要工作是找出传入结点的下一个结点(状态不是取消),然后通过LockSupport.unpark()方法唤醒该结点。如果发现传入结点的下一个结点无效,则从尾结点开始扫描,找到离头结点最近的 + 状态不是取消的结点。

 

(3)AQS的release()方法总结

AQS的release()方法主要做了两件事情:

一.通过tryRelease()方法释放锁(递减state变量)

二.通过unparkSuccessor()方法唤醒等待队列中的下一个线程

 

由于是独占锁,只有持有锁的线程才有资格释放锁,所以tryRelease()方法修改state变量值时不需要使用CAS操作。

 

10.ReentractLock的响应中断和超时获取

(1)ReentractLock的响应中断

ReentractLock的lock()方法不能响应中断,但是ReentractLock的lockInterruptibly()方法可以响应中断。

 

AQS的doAcquireInterruptibly()方法和AQS的acquireQueued()方法不同的是,前者收到线程中断信号后,不再去重新竞争锁,而是直接抛出异常。

public class ReentrantLock implements Lock, java.io.Serializable {     private final Sync sync;        public ReentrantLock() {         sync = new NonfairSync();     }     ...          //Acquires the lock unless the current thread is Thread#interrupt interrupted.     public void lockInterruptibly() throws InterruptedException {         //执行AQS的acquireInterruptibly()方法         sync.acquireInterruptibly(1);     }     ... }  public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     ...     //Acquires in exclusive mode, aborting if interrupted.     //Implemented by first checking interrupt status, then invoking at least once #tryAcquire, returning on success.     //Otherwise the thread is queued, possibly repeatedly blocking and unblocking, invoking #tryAcquire until success or the thread is interrupted.     //This method can be used to implement method Lock#lockInterruptibly.     public final void acquireInterruptibly(int arg) throws InterruptedException {         if (Thread.interrupted()) {             throw new InterruptedException();         }         //首先还是执行AQS子类重写的tryAcquire()方法         if (!tryAcquire(arg)) {             //执行AQS的doAcquireInterruptibly()方法             doAcquireInterruptibly(arg);         }     }      //Acquires in exclusive interruptible mode.     private void doAcquireInterruptibly(int arg) throws InterruptedException {         final Node node = addWaiter(Node.EXCLUSIVE);         boolean failed = true;         try {             for (;;) {                 final Node p = node.predecessor();                 if (p == head && tryAcquire(arg)) {                     setHead(node);                     p.next = null; // help GC                     failed = false;                     return;                 }                 //执行shouldParkAfterFailedAcquire()方法会设置node结点的前驱结点的状态为SIGNAL                 //执行parkAndCheckInterrupt()方法挂起当前线程                 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {                     //和acquireQueued()方法不同的是,这里收到线程中断信号后,不再去重新竞争锁,而是直接抛异常返回                     throw new InterruptedException();                 }             }         } finally {             if (failed) {                 cancelAcquire(node);             }         }     }      //Acquires in exclusive uninterruptible mode for thread already in queue.      //Used by condition wait methods as well as acquire.     final boolean acquireQueued(final Node node, int arg) {         boolean failed = true;         try {             boolean interrupted = false;             for (;;) {                 final Node p = node.predecessor();                 if (p == head && tryAcquire(arg)) {                     setHead(node);                     p.next = null; // help GC                     failed = false;                     return interrupted;                 }                 //执行shouldParkAfterFailedAcquire()方法设置node结点的前驱结点的状态为SIGNAL                 //执行parkAndCheckInterrupt()方法挂起当前线程                 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {                     interrupted = true;                 }             }         } finally {             if (failed) {                 cancelAcquire(node);             }         }     }     ... }

(2)ReentractLock的超时获取

在超时时间非常短的情况下,AQS不会挂起线程,而是让线程自旋去获取锁。

public class ReentrantLock implements Lock, java.io.Serializable {     private final Sync sync;        public ReentrantLock() {         sync = new NonfairSync();     }     ...          //Acquires the lock if it is not held by another thread within the given waiting time      //and the current thread has not been Thread#interrupt interrupted.     public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {         //执行AQS的tryAcquireNanos()方法         return sync.tryAcquireNanos(1, unit.toNanos(timeout));     }     ... }  public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     ...     //Attempts to acquire in exclusive mode, aborting if interrupted, and failing if the given timeout elapses.     //Implemented by first checking interrupt status, then invoking at least once #tryAcquire, returning on success.     //Otherwise, the thread is queued, possibly repeatedly blocking and unblocking,      //invoking #tryAcquire until success or the thread is interrupted or the timeout elapses.     //This method can be used to implement method Lock#tryLock(long, TimeUnit).     public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {         if (Thread.interrupted()) {             throw new InterruptedException();         }         //如果执行AQS子类重写的tryAcquire()方法失败,才执行AQS的doAcquireNanos()方法         return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);     }      //Acquires in exclusive timed mode.     private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {         if (nanosTimeout <= 0L) {             return false;         }         final long deadline = System.nanoTime() + nanosTimeout;         final Node node = addWaiter(Node.EXCLUSIVE);         boolean failed = true;         try {             for (;;) {                 final Node p = node.predecessor();                 if (p == head && tryAcquire(arg)) {                     setHead(node);                     p.next = null; // help GC                     failed = false;                     return true;                 }                 nanosTimeout = deadline - System.nanoTime();                 if (nanosTimeout <= 0L) {//已超时                     return false;                 }                 //在超时时间非常短的情况下,AQS不会挂起线程,而是让线程自旋去获取锁;                 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) {                     //最多挂起线程nanosTimeout                     LockSupport.parkNanos(this, nanosTimeout);                 }                 if (Thread.interrupted()) {                     throw new InterruptedException();                 }             }         } finally {             if (failed) {                 cancelAcquire(node);             }         }     }     ... }

 

11.ReentractLock的公平锁FairSync

(1)ReentrantLock的非公平加锁策略

(2)ReentrantLock的公平加锁策略

(3)ReentrantLock的公平锁实现

 

(1)ReentrantLock的非公平加锁策略

ReentrantLock默认使用的是非公平加锁的策略。即新来抢占锁的线程,不管有没有其他线程在排队,都先通过CAS抢占锁,也就是让等待队列的队头的后继结点线程和新来抢占锁的线程进行竞争。非公平加锁策略的效率会高些,因为可以让新来的线程也有机会抢占到锁。

 

(2)ReentrantLock的公平加锁策略

如果希望每个线程过来都按照顺序去进行排队来获取锁,那么就是公平锁。ReentrantLock如果要使用公平加锁的策略,只需要在构造函数传入true。

public class ReentrantLock implements Lock, java.io.Serializable {     ...     //Creates an instance of ReentrantLock.     //This is equivalent to using ReentrantLock(false).     public ReentrantLock() {         sync = new NonfairSync();     }      //Creates an instance of ReentrantLock with the given fairness policy.     //@param fair true if this lock should use a fair ordering policy     public ReentrantLock(boolean fair) {         sync = fair ? new FairSync() : new NonfairSync();     }     ... }

(3)ReentrantLock的公平锁实现

公平锁FairSync的tryAcquire()方法,会在使用CAS操作获取锁之前,增加一个判断条件,先判断等待队列中是否有线程在排队获取锁。如果没有其他线程排队获取锁(头结点==尾结点),则当前线程可以获取锁。如果有其他线程排队获取锁,则当前线程不能获取锁,需要阻塞等待。如果等待队列的头结点的后继结点的线程是当前线程,则当前线程重入锁。

 

所以公平锁的核心实现其实就是"!hasQueuedPredecessors()"这个判断条件,每次加锁时都会首先判断:等待队列中是否有线程在排队获取锁。

public class ReentrantLock implements Lock, java.io.Serializable {     ...     //Sync object for fair locks,公平锁     static final class FairSync extends Sync {         final void lock() {             acquire(1);         }         protected final boolean tryAcquire(int acquires) {             final Thread current = Thread.currentThread();             int c = getState();             if (c == 0) {                 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {                     setExclusiveOwnerThread(current);                     return true;                 }             } else if (current == getExclusiveOwnerThread()) {                 int nextc = c + acquires;                 if (nextc < 0) {                     throw new Error("Maximum lock count exceeded");                 }                 setState(nextc);                 return true;             }             return false;         }     }      //Sync object for non-fair locks,非公平锁     static final class NonfairSync extends Sync {         //Performs lock.  Try immediate barge, backing up to normal acquire on failure.         final void lock() {             if (compareAndSetState(0, 1)) {                 setExclusiveOwnerThread(Thread.currentThread());             } else {                 acquire(1);             }         }                protected final boolean tryAcquire(int acquires) {             return nonfairTryAcquire(acquires);         }     }      abstract static class Sync extends AbstractQueuedSynchronizer {         //Performs Lock#lock. The main reason for subclassing is to allow fast path for nonfair version.         abstract void lock();          //Performs non-fair tryLock.           //tryAcquire is implemented in subclasses, but both need nonfair try for trylock method.         final boolean nonfairTryAcquire(int acquires) {             final Thread current = Thread.currentThread();             int c = getState();             if (c == 0) {                 if (compareAndSetState(0, acquires)) {                     setExclusiveOwnerThread(current);                     return true;                 }             } else if (current == getExclusiveOwnerThread()) {                 int nextc = c + acquires;                 if (nextc < 0) {                     throw new Error("Maximum lock count exceeded");                 }                 setState(nextc);                 return true;             }             return false;         }         ...     }     ... }  public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {     ...     //Queries whether any threads have been waiting to acquire longer than the current thread.     //判断当前队列中是否有线程排队     public final boolean hasQueuedPredecessors() {         Node t = tail; // Read fields in reverse initialization order         Node h = head;         Node s;         //所以!hasQueuedPredecessors() 等价于:         //h == t || (h.next != null && h.next.thread == Thread.currentThread())         return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());     }     ... }

 

12.AQS的方法和使用总结

(1)访问或修改state的方法

(2)需要子类重写的方法

(3)AQS提供的模版方法

(4)如何使用AQS自定义独占锁

(5)独占锁的获取和释放总结

 

(1)访问或修改state的方法

getState():获取同步状态 setState(int newState):设置当前同步状态 compareAndSetState(int expect, int update):使用CAS设置同步状态 setExclusiveOwnerThread(Thread thread):标识加锁的线程

(2)需要子类重写的方法

tryAcquire():独占式获取同步状态,实现该方法需要查询当前同步状态并判断同步状态是否符合预期,然后再CAS设置同步状态 tryRelease():独占式释放同步状态 tryAcquireShared():共享式获取同步状态 tryReleaseShared():共享式释放同步状态 isHeldExclusively():在独占模式下是否被在线线程占有

(3)AQS提供的模版方法

一.独占式获取与释放同步状态 acquire():独占式获取同步状态,该方法会调用子类重写的tryAcquire()方法;获取成功则返回,获取失败则当前线程会进入等待队列等待; acquireInterruptibly():与acquire()相同,会响应中断 tryAcquireNanos():在acquireInterruptibly()基础上增加超时限制 release():独占式释放同步状态   二.共享式获取与释放同步状态 acquireShared():共享式获取同步状态,与独占式区别是同一时刻可以有多个线程获取同步状态 acquireSharedInterruptibly():与acquireShared()相同,会响应中断 tryAcquireSharedNanos():在acquireSharedInterruptibly()基础上增加了超时限制 releaseShared():共享式释放同步状态   三.查询等待队列中等待的线程 getQueuedThreads()

(4)如何使用AQS自定义独占锁

独占锁就是同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于等待队列中等待。只有获取锁的线程释放了锁,后继的线程才能尝试获取锁。

 

步骤一:定义一个静态内部类Sync

在实现Lock接口的自定义Mutex类中定义一个静态内部类Sync,并且初始化这个静态内部类(子类)的一个实例。

 

步骤二:实现tryAcquire()和tryRelease()方法

该内部类Sync继承AQS,需要实现独占式获取锁和释放锁的方法。在获取锁的tryAcquire()方法中,如果成功设置state为1,则代表获取锁。在释放锁的tryRelease()方法中,需要重置state为0,持有锁线程为null。

 

步骤三:实现自定义Mutex类的lock()方法

在自定义Mutex类获取锁的lock()方法中,只需要通过Sync实例,调用AQS的acquire()方法即可。

 

(5)独占锁的获取和释放总结

一.获取锁时AQS会维护一个等待队列

获取锁失败的线程会被加入到AQS的等待队列中,然后进行自旋 + 阻塞等待。

 

二.线程移出队列 + 停止自旋的条件

这个条件就是线程对应的前驱结点为头结点并且该线程成功获取了锁。

 

三.释放锁时的处理

AQS的tryRelease()方法会释放锁,然后唤醒头结点的后继结点对应的线程。

 

发表评论

评论已关闭。

相关文章

当前内容话题
  • 0