ReentrantLock介绍及源码解析

ReentrantLock介绍及源码解析

一、ReentrantLock介绍

  • ReentrantLock是JUC包下的一个并发工具类,可以通过他显示的加锁(lock)和释放锁(unlock)来实现线程的安全访问,ReentrantLock还可以实现公平锁和非公平锁,并且其与synchronized的作用是一致的,区别在于加锁的底层实现不一样,写法上也不一样,具体异同可以参见下图:

ReentrantLock介绍及源码解析

二、ReentrantLock的源码简析

1、源码分析

  • ReentrantLock(下面简称RL)就是AQS独占锁的一个典型实现,其通过维护state变量的值来判断当前线程是否能够拥有锁,如果通过cas将state成功从0变成1表示争用资源成功,否则表示争用失败,进入CLH队列,通过CLH队列来维护那些暂时没抢占到锁资源的线程;其内部维护了一个名为Sync的内部类来继承AQS,又因为RL既可以支持公平锁也可以支持非公平锁,所以其内部还维护了两个内部类FairSync和NonfairSync来继承Sync,通过他们来实现AQS的模板方法从而实现加锁的过程;类的关系图如下:

ReentrantLock介绍及源码解析

  • 公平锁和非公平锁在源码层的两点区别:

    1、非公平上来直接抢锁

    2、当state=0时,非公平直接抢,公平锁还会判断队列还有没有前置节点

2、lock方法的源码跟踪

下面就让我们跟踪RL的lock()和unLock()源码来看看代码级别是怎么实现的吧!

需要注意的是,本文跟踪的是非公平锁的加解锁过程,公平锁的实现大体一致,当源码中有与公平锁的显著差别时我会通过注释给出解释

  • 试用例如下
public static void main(String[] args) throws InterruptedException {     long start = System.currentTimeMillis();     List<Thread> list = new ArrayList<>();     ReentrantLock lock = new ReentrantLock();     for (int i = 0; i < 1000; i++) {         Thread thread = new Thread(()-> {             for (int j = 0; j < 1000; j++) {                 // 解锁                 lock.lock();                 count++;                 // 释放锁                 lock.unlock();             }         });         list.add(thread);     }     for (Thread thread : list) {         thread.start();     }     for (Thread thread : list) {         thread.join();     }     System.out.println("auto.count = " + count + "耗时:" + (System.currentTimeMillis() -start)); } 

(1)、lock()的源码跟踪与解析

跟踪lock.lock()发现其调用的是内部类Sync的lock()方法,该方法是一个抽象方法,具体实现由FairSync和NonfairSync实现,由于我们构造RL时调用的是无参构造函数,所以这里会直接进入NonfairSync的lock()方法;具体实现代码和注释如下:

/**  * java.util.concurrent.locks.ReentrantLock.NonfairSync#lock()  */ final void lock() {     // 由于是非公平锁所以这里上来直接争抢资源,尝试通过CAS操作将state的值由0变成1     if (compareAndSetState(0, 1))          // 如果成功将state值变成1表示争抢锁成功,设置当前拥有独占访问权的线程。         setExclusiveOwnerThread(Thread.currentThread());     else         // 争抢失败再进入与公平锁一样的排队逻辑         acquire(1); } 

tips:

1、上面的compareAndSetState方法也是由AQS提供的,里面借助Unsafe实现了对state的cas操作更新

2、setExclusiveOwnerThread也可以理解成由AQS提供(其实是AQS的父类,不过不影响理解),给exclusiveOwnerThread变量赋值,exclusiveOwnerThread表示当前正在拥有锁的线程

3、acquire方法同样由AQS提供,其内部实现也是lock环节比较关键的代码,下面我会详细解释

(2)、acquire()的源码跟踪与解析

acquire方法的源码如下:

/**  *  java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire(int)  */ public final void acquire(int arg) {     /**      * 1、尝试获取锁;如果成功此方法结束,当前线程执行同步代码块      * 2、如果获取失败,则构造Node节点并加入CLH队列      * 3、然后继续等待锁      */     if (!tryAcquire(arg) &&         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))         // 如果获取锁失败,添加CLH队列也失败,那么直接中断当前线程         selfInterrupt(); } 

tips:

1、tryAcquire方法是AQS的一个模板方法,RL下的公平和非公平锁都有不同的实现,下面会详解

2、addWaiter方法是AQS的一个默认实现方法,负责构造当前线程所在的Node,并将其设置到队列的尾巴上

3、acquireQueued方法也是AQS的默认实现,旨在设置CLH队列的head和阻塞当前线程

上面的三个方法下面也会一一介绍

(3)、tryAcquire()的源码跟踪与解析

  • tryAcquire()方法可以理解成尝试获取锁,如果获取成功即表示当前线程拥有了锁;跟踪源码需要注意的一点是:该方法在非公平锁(NonFairSync)下的实现最终调用的是Sync里的nonfairTryAcquire方法,所以我们直接观察该方法是如何实现的即可
/**  * java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire(int)  */ final boolean nonfairTryAcquire(int acquires) {     // 当前线程     final Thread current = Thread.currentThread();     // 获取当前state的值     int c = getState();     if (c == 0) {         /**          * 非公平锁发现资源未被占用时直接CAS尝试抢占资源;而公平锁发现资源未被占用时          * 先判断队列里是否还有前置节点再等待,没有才会去抢占资源          */         if (compareAndSetState(0, acquires)) {             // 如果成功将state值变成1表示争抢锁成功,设置当前拥有独占访问权的线程。             setExclusiveOwnerThread(current);             return true;         }     }     /**       * 如果state!=0表示有争用,再判断当前系统拥有独占权限的线程是不是当前线程,      * 如果是,则需要支持线程重入,将state的值加1      */     else if (current == getExclusiveOwnerThread()) {// 处理可重入的逻辑         int nextc = c + acquires;         if (nextc < 0) // overflow             throw new Error("Maximum lock count exceeded");         setState(nextc);         return true;     }     // state既不等于0也不需要重入则返回false;表示获取锁失败,代码返回后继续执行acquireQueued方法     return false; } 

(4)addWaiter()的源码跟踪与解析

  • 执行到addWaiter方法表示前面的tryAcquire尝试获取锁失败了,需要由此方法构建Node节点并加入到CLH队列的末尾;此方法返回的Node即为当前CLH队列的tail节点
/**  * java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node)  */ private Node addWaiter(Node mode) {     // 构建Node对象     Node node = new Node(Thread.currentThread(), mode);     /**      * 将当前队列的尾节点赋值给pred,通过命名和下面的代码其实可以发现就是想让tail作为当前节点的前置节点;      * 但是为什么不直接用tail而将其赋值给pred再用呢?我想应该是考虑并发环境下tail的引用有可能会被其他线程改变      */     Node pred = tail;     if (pred != null) {         // 如果当前队列的尾结点(tail)不为空,就将其作为当前Node节点的前置节点         node.prev = pred;         // 然后通过AQS自带的cas方法将当前构建的Node节点插入到队列的尾巴上         if (compareAndSetTail(pred, node)) {             // 如果成功了,前置节点也就是之前的tail节点的后继节点就是当前节点,赋值             pred.next = node;             // 返回构建的Node节点,即当前队列的tail节点             return node;         }     }     // 如果队列的tail节点为空,或者cas设置tail节点失败的话调用此方法;旨在重新设置队列的tail节点     enq(node);     return node; } 

(5)、acquireQueued()的源码跟踪与解析

  • 当线程通过tryAcquire上锁失败,然后通过addWaiter将当前线程添加到队列末尾后,通过此方法再次判断是否轮到当前节点,并再次尝试获取锁,获取不到的话进行阻塞操作,源码与注释如下:
/**  * java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node, int)  */ final boolean acquireQueued(final Node node, int arg) {     boolean failed = true;     try {         boolean interrupted = false;         for (;;) {             // 获取tail节点的前置节点             final Node p = node.predecessor();             /**              * 如果前置节点就是头节点表示当前tail节点就是第二个节点,就可以尝试着去获取锁,              * 然后将tail节点设置成头节点,返回线程中断状态为false;表示当前线程获取到锁              */             if (p == head && tryAcquire(arg)) {                 setHead(node);                 /**                  * 既然tail已经获取到锁了,那么前置节点就没用了,这里将前置节点的next设置为空,                  * 是为了方便垃圾回收,因为如果不指定为空,前置节点的next就是当前的tail节点,                  * 不会被回收                  */                 p.next = null; // help GC                 failed = false;                 return interrupted;             }             /**              * 如果前置节点不为head,或者虽然前置节点是head但是获取锁失败,那么就              * 需要在这里将线程阻塞,阻塞利用的是LockSupport.park(thread)来实现的              */             if (shouldParkAfterFailedAcquire(p, node) &&                 parkAndCheckInterrupt())                 interrupted = true;         }     } finally {         if (failed)             // 退出获取锁             cancelAcquire(node);     } } 

至此,RL非公平锁加锁的过程的源码跟踪完毕,流程也不算复杂,下面简单梳理一遍:

1、上来直接尝试获取锁(修改state值),成功表示获取成功

2、否则执行tryAcquire方法尝试通过cas的方式获取锁,并处理可能存在的重入操作

3、获取失败则通过addWriter方法构建Node节点并加入CLH队列的末尾

4、然后在acquireQueued里再次获取锁,获取失败则阻塞当前线程;

下面简单画了一下lock()方法的调用泳道图

1、调用父类AQS的compareAndSetState通过cas的模式尝试将state状态改为1,修改成功则持有锁,将当前线程设为ExclusiveOwnerThread

ReentrantLock介绍及源码解析

3、unLock方法的源码跟踪

  • 释放锁其实就是将state状态减1,然后处理可重入逻辑,如果没有重入的话直接唤醒当前队列的head节点,把当前线程所在的Node节点从队列中剔除
  • unLock方法对应AQS的tryRelease模板方法的实现,其没有lock那么复杂,因为不用支持公平和非公平锁,所以其可以直接在sync中调用AQS提供的release方法,然后触发tryRelease,调用sync里的tryRelease实现从而实现解锁

AQS的release源码

/**  * java.util.concurrent.locks.AbstractQueuedSynchronizer#release(int)  */ public final boolean release(int arg) {     // 尝试释放锁     if (tryRelease(arg)) {         // 释放成功,判断当前队列头节点是否为空,不为空并且等待状态不等于0则唤醒当前队列的头节点         Node h = head;         if (h != null && h.waitStatus != 0)             unparkSuccessor(h);         return true;     }     return false; } 

RL的tryRelease实现

/**  * java.util.concurrent.locks.ReentrantLock.Sync#tryRelease(int)  * @param releases  * @return  */ protected final boolean tryRelease(int releases) {     // state减1     int c = getState() - releases;     // 如果当前线程不是正在获取到锁的线程直接抛异常     if (Thread.currentThread() != getExclusiveOwnerThread())         throw new IllegalMonitorStateException();     boolean free = false;     // 如果state减1后等于0表示没有重入,表示释放锁成功,将当前获取锁的线程置空     if (c == 0) {         free = true;         setExclusiveOwnerThread(null);     }     // 将最新的state状态更新到AQS中     setState(c);     return free; } 

unlock()总结:

1、调用父类AQS的release方法实际调用的是tryRelease这个模板方法由ReentrantLock本身实现

2、tryRelease方法尝试将state减1,如果减完等于0表示解锁成功,将ExclusiveOwner线程设为空;并且唤醒队列的头节点(unparkSuccessor)。

3、如果不等于0表示解锁失败,将state设为减1过后的值;也是为了可重入

发表评论

评论已关闭。

相关文章

当前内容话题