Java源码分析系列笔记-6.ReentrantLock

1. 是什么

在jdk5之前,synchronized效率极低,于是写了ReentranLock代替。
后来jdk7优化了synchronized,参考Java源码分析系列笔记-2.锁的优化 - ThinkerQAQ - 博客园。两者性能区别不大

1.1. synchronized vs ReentranLock

比较 Synchronized ReentrantLock
等待 结合object wait/notify 结合condition await/signal
使用难度 简单。jvm会处理加锁,解锁的过程 麻烦。需要手动lock、unlock,且unlock得放在finally块中
特性 可重入 不可中断 非公平 可重入 可中断 可公平
实现原理 monitor AQS

2. 实现原理

2.1. uml图

Java源码分析系列笔记-6.ReentrantLock
由uml图可以看出ReentranLock底层是用AQS实现的,有一个Sync属性(继承AQS类),如果是非公平锁则用的NonfairSync实现类,否则用的FairSync类
具体的实现参考

3. 公平锁

所谓公平锁,遵循先到先得的原则。
即使锁已经被释放了,后到的也不能去抢占锁,得等到前面没人时才能去获取

3.1. 如何使用

public class TestReentrantLock {     private static int val = 0;     private final static Lock lock = new ReentrantLock(true);//公平锁      public static void main(String[] args) throws InterruptedException     {         Thread thread1 = new Thread(() -> {              for (int i = 0; i < 100000; i++)             {                 try                 {                     lock.lock();                     val++;                 }                 finally                 {                     lock.unlock();                 }              }         });          Thread thread2 = new Thread(() -> {              for (int i = 0; i < 100000; i++)             {                 try                 {                     lock.lock();                     val--;                 }                 finally                 {                     lock.unlock();                 }             }         });           thread1.start();         thread2.start();          thread1.join();         thread2.join();         System.out.println(val);     } } 

3.2. 原理分析

3.2.1. 构造方法

3.2.1.1. 底层使用AQS实现

public class ReentrantLock implements Lock, java.io.Serializable {      private final Sync sync;      //默认非公平锁     public ReentrantLock() {         sync = new NonfairSync();     }      public ReentrantLock(boolean fair) {         //true的话,公平锁使用FairSync,否则是NonfairSync         sync = fair ? new FairSync() : new NonfairSync();     }      //Sync是AQS的子类     abstract static class Sync extends AbstractQueuedSynchronizer {}     //FairSync是Sync的子类     static final class FairSync extends Sync {} } 

3.2.2. 加锁

  • lock
public void lock() {     //调用FairSync的lock     sync.lock(); } 

3.2.2.1. 调用公平锁的lock方法

  • FairSync.lock
final void lock() {     //调用AQS的acquire     acquire(1); } 

3.2.2.2. 调用AQS的acquire方法获取锁

  • AQS.acquire
public final void acquire(int arg) {     //调用FairSync的tryAcquire获取锁     if (!tryAcquire(arg) &&     	//获取锁失败加入AQS队列。并且死循环阻塞当前线程,等待唤醒继续获取锁         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))     	//恢复中断标记         selfInterrupt(); } 

由于FairSync重写了AQS的tryAcquire方法,因此这里会调用FairSync的tryAcquire
其他的逻辑同5.AQS.md,下面只是简要说一下主要逻辑

3.2.2.3. 尝试获取锁【只有队头才允许抢占锁--公平锁】

  • FairSync.tryAcquire
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread();     int c = getState();     //锁尚未被获取     if (c == 0) {     	//【公平锁】:队列中我的前面没人等待锁(队列为空或者我就是队列的队头)         if (!hasQueuedPredecessors() &&         	//CAS设置state获取锁成功             compareAndSetState(0, acquires)) {             //设置持有锁的线程为当前线程             setExclusiveOwnerThread(current);             return true;         }     }     //锁已经被获取,且是当前线程,那么重入     else if (current == getExclusiveOwnerThread()) {     	//增加state量         int nextc = c + acquires;         if (nextc < 0)             throw new Error("Maximum lock count exceeded");         setState(nextc);         return true;     }     //获取锁失败返回false     return false; } 

3.2.2.4. 尝试获取锁失败加入阻塞队列

  • AQS.addWaiter
private Node addWaiter(Node mode) { 	//用当前线程、EXCLUSIVE模式构造节点     Node node = new Node(Thread.currentThread(), mode);     // 队列不为空     Node pred = tail;     if (pred != null) {     	//插入到队尾         node.prev = pred;         if (compareAndSetTail(pred, node)) {             pred.next = node;             return node;         }     }     //队列为空或者插入到队尾失败     enq(node);     return node; } 
3.2.2.4.1. 入队的操作
  • enq
private Node enq(final Node node) { //死循环直到入队成功 for (;;) {     Node t = tail; 	//队列为空,那么初始化头节点。注意是new Node而不是当前node(即队头是个占位符)     if (t == null) {         if (compareAndSetHead(new Node()))             tail = head; 	//队列不为空,插入到队尾     } else {         node.prev = t;         if (compareAndSetTail(t, node)) {             t.next = node;             return t;         }     } } } 

3.2.2.5. 阻塞,等待唤醒继续获取锁

  • acquireQueued
final boolean acquireQueued(final Node node, int arg) {     boolean failed = true;     try {         boolean interrupted = false;     	//死循环直到获取锁成功         for (;;) {         	//逻辑1.     		//当前节点的前一个节点时头节点的时候(公平锁:即我的前面没有人等待获取锁),尝试获取锁             final Node p = node.predecessor();             if (p == head && tryAcquire(arg)) {             	//获取锁成功后设置头节点为当前节点                 setHead(node);                 p.next = null; // help GC                 failed = false;                 return interrupted;             }         	//逻辑2.             //当前节点的前一个节点状态时SIGNAL(承诺唤醒当前节点)的时候,阻塞当前线程。             //什么时候唤醒?释放锁的时候             //唤醒之后干什么?继续死循环执行上面的逻辑1             if (shouldParkAfterFailedAcquire(p, node) &&                 parkAndCheckInterrupt())                 interrupted = true;         }     } finally {     	//何时执行这段逻辑?发生异常导致获取锁失败的时候         if (failed)             cancelAcquire(node);     } } 
3.2.2.5.1. 判断是否需要阻塞
  • shouldParkAfterFailedAcquire
//根据(前一个节点,当前节点)->是否阻塞当前线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {     int ws = pred.waitStatus;     //前一个节点的状态时SIGNAL,即释放锁后承诺唤醒当前节点,那么返回true可以阻塞当前线程     if (ws == Node.SIGNAL)         return true;     //前一个节点状态>0,即CANCEL。     //那么往前遍历找到没有取消的前置节点。同时从链表中移除CANCEL状态的节点     if (ws > 0) {         do {             node.prev = pred = pred.prev;         } while (pred.waitStatus > 0);         pred.next = node;     // 前置节点状态>=0,即0或者propagate。     //这里通过CAS把前置节点状态改成signal成功获取锁,失败的话再阻塞。why?     } else {         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     }     return false; } 
3.2.2.5.1.1. 阻塞当前线程
  • parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {     //使用Unsafe阻塞当前线程,这里会清除线程中断的标记,因此需要返回中断的标记     LockSupport.park(this);     return Thread.interrupted(); } 

3.2.3. 解锁

public void unlock() {     //调用AQS的release方法     sync.release(1); } 

3.2.3.1. 使用AQS释放锁

  • release
public final boolean release(int arg) {     //Sync重写了调用Sync释放锁成功     if (tryRelease(arg)) {         Node h = head;     	//队头不为空且状态正常,那么唤醒头节点         if (h != null && h.waitStatus != 0)             unparkSuccessor(h);         return true;     }     return false; } 

Sync重写了tryRelease方法,因此这里调用的是Sync.tryRelease
其他的逻辑同5.AQS.md,下面只是简要说一下主要逻辑

3.2.3.2. 尝试释放锁

  • Sync.tryRelease
protected final boolean tryRelease(int releases) {     //解锁     int c = getState() - releases;     //加锁解锁必须同一个线程     if (Thread.currentThread() != getExclusiveOwnerThread())         throw new IllegalMonitorStateException();     boolean free = false;     if (c == 0) {         //锁全部释放成功后,置占用锁的线程为空         free = true;         setExclusiveOwnerThread(null);     }     //CAS设置解锁     setState(c);     return free; } 

3.2.3.3. 释放锁成功后唤醒阻塞队列中的节点

  • AQS.unparkSuccessor
private void unparkSuccessor(Node node) {     int ws = node.waitStatus;     //当前节点的状态<0,则把状态改为0     //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何     if (ws < 0)         compareAndSetWaitStatus(node, ws, 0);        //当前节点的下一个节点为空或者状态>0(即是取消状态)     Node s = node.next;     if (s == null || s.waitStatus > 0) {         s = null;         //那么从队尾开始往前遍历找到离当前节点最近的下一个状态<=0的节点(即非取消状态)         for (Node t = tail; t != null && t != node; t = t.prev)             if (t.waitStatus <= 0)                 s = t;     } 	//唤醒下一个节点(公平锁)     if (s != null)         LockSupport.unpark(s.thread); } 

4. 非公平锁

所谓非公平锁,就是只要锁已经被释放了,那么不管是先到的还是后到的,都可以去抢占锁

4.1. 如何使用

public class TestReentrantLock {     private static int val = 0;     private final static Lock lock = new ReentrantLock();//非公平锁      public static void main(String[] args) throws InterruptedException     {         Thread thread1 = new Thread(() -> {              for (int i = 0; i < 100000; i++)             {                 try                 {                     lock.lock();                     val++;                 }                 finally                 {                     lock.unlock();                 }              }         });          Thread thread2 = new Thread(() -> {              for (int i = 0; i < 100000; i++)             {                 try                 {                     lock.lock();                     val--;                 }                 finally                 {                     lock.unlock();                 }             }         });           thread1.start();         thread2.start();          thread1.join();         thread2.join();         System.out.println(val);     } } 

4.2. 实现原理

4.2.1. 构造方法

public class ReentrantLock implements Lock, java.io.Serializable {       private final Sync sync;       //默认非公平锁     public ReentrantLock() {         sync = new NonfairSync();     }       public ReentrantLock(boolean fair) {         //true的话,公平锁使用FairSync,否则是NonfairSync         sync = fair ? new FairSync() : new NonfairSync();     }       //Sync是AQS的子类     abstract static class Sync extends AbstractQueuedSynchronizer {}     //FairSync是Sync的子类     static final class FairSync extends Sync {} } 

4.2.2. 加锁

public void lock() { 	//简单得调用Sync属性的lock方法。即NonfairSync的lock方法     sync.lock(); } 

4.2.2.1. 使用非公平锁加锁

  • NonfairSync lock方法
final void lock() { 	//获取锁。使用CAS设置state的值为1,这里state代表互斥量     if (compareAndSetState(0, 1))     	//设置当前线程为拥有互斥量的线程         setExclusiveOwnerThread(Thread.currentThread());     else     	//获取失败则调用AQS的acquire方法         acquire(1); } 

4.2.2.2. 通过AQS加锁

  • AQS.acquire方法
public final void acquire(int arg) { 	//调用NonFairSync的tryAcquire获取锁     if (!tryAcquire(arg) &&     	//获取锁失败加入AQS队列。并且死循环阻塞当前线程,等待唤醒继续获取锁         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))     	//恢复中断标记         selfInterrupt(); } 

由于NonfairSync重写了AQS的tryAcquire方法,因此这里会调用NonfairSync的tryAcquire
其他的逻辑同5.AQS.md,下面只是简要说一下主要逻辑

4.2.2.3. 通过非公平锁尝试加锁

  • NonfairSync.tryAcquire
protected final boolean tryAcquire(int acquires) {     //调用NonfairSync.nonfairTryAcquire     return nonfairTryAcquire(acquires); } 
4.2.2.3.1. 非公平锁尝试加锁的操作【不管是否队头都可以抢占锁--非公平锁】
  • NonfairSync.nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {     final Thread current = Thread.currentThread();     int c = getState();     //锁尚未被获取     if (c == 0) {     	//不管前面是否有人等待,直接尝试获取锁(非公平锁)         if (compareAndSetState(0, acquires)) {             setExclusiveOwnerThread(current);             return true;         }     }     //锁已被获取且时当前线程,重入     else if (current == getExclusiveOwnerThread()) {         int nextc = c + acquires;         if (nextc < 0) // overflow             throw new Error("Maximum lock count exceeded");         setState(nextc);         return true;     }     return false; } 

4.2.2.4. 尝试加锁失败,加入阻塞队列

  • AQS.addWaiter
 private Node addWaiter(Node mode) {  	//用当前线程、EXCLUSIVE模式构造节点     Node node = new Node(Thread.currentThread(), mode);     // 队列不为空     Node pred = tail;     if (pred != null) {     	//插入到队尾         node.prev = pred;         if (compareAndSetTail(pred, node)) {             pred.next = node;             return node;         }     }     //队列为空或者插入到队尾失败     enq(node);     return node; } 
4.2.2.4.1. 加入队列的操作
  • AQS.enq
private Node enq(final Node node) { 	//死循环直到入队成功     for (;;) {         Node t = tail;     	//队列为空,那么初始化头节点。注意是new Node而不是当前node(即队头是个占位符)         if (t == null) {             if (compareAndSetHead(new Node()))                 tail = head; 		//队列不为空,插入到队尾         } else {             node.prev = t;             if (compareAndSetTail(t, node)) {                 t.next = node;                 return t;             }         }     } } 
  • acquireQueued
final boolean acquireQueued(final Node node, int arg) {     boolean failed = true;     try {         boolean interrupted = false;     	//死循环直到获取锁成功         for (;;) {         	//逻辑1.     		//当前节点的前一个节点时头节点的时候(公平锁:即我的前面没有人等待获取锁),尝试获取锁             final Node p = node.predecessor();             if (p == head && tryAcquire(arg)) {             	//获取锁成功后设置头节点为当前节点                 setHead(node);                 p.next = null; // help GC                 failed = false;                 return interrupted;             }         	//逻辑2.             //当前节点的前一个节点状态时SIGNAL(承诺唤醒当前节点)的时候,阻塞当前线程。             //什么时候唤醒?释放锁的时候             //唤醒之后干什么?继续死循环执行上面的逻辑1             if (shouldParkAfterFailedAcquire(p, node) &&                 parkAndCheckInterrupt())                 interrupted = true;         }     //如果发生了异常,那么执行下面的逻辑     } finally {     	//除了获取锁成功的情况都会执行cancelAcquire方法         if (failed)             cancelAcquire(node);     } } 
  • shouldParkAfterFailedAcquire
//根据(前一个节点,当前节点)->是否阻塞当前线程 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {     int ws = pred.waitStatus; 	//前一个节点的状态时SIGNAL,即释放锁后承诺唤醒当前节点,那么返回true可以阻塞当前线程     if (ws == Node.SIGNAL)         return true;     //前一个节点状态>0,即CANCEL。     //那么往前遍历找到没有取消的前置节点。同时从链表中移除CANCEL状态的节点     if (ws > 0) {         do {             node.prev = pred = pred.prev;         } while (pred.waitStatus > 0);         pred.next = node; 	// 前置节点状态>=0,即0或者propagate。 	//这里通过CAS把前置节点状态改成signal成功获取锁,失败的话再阻塞。why?     } else {         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     }     return false; } 
  • parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() { 	//使用Unsafe阻塞当前线程,这里会清除线程中断的标记,因此需要返回中断的标记     LockSupport.park(this);     return Thread.interrupted(); } 

4.2.3. 解锁

public void unlock() { 		//简单得调用AQS的release方法         sync.release(1);     } 

4.2.3.1. 使用AQS释放锁

  • release
public final boolean release(int arg) { 	//调用Sync释放锁成功     if (tryRelease(arg)) {         Node h = head; 		//队头不为空且状态正常,那么唤醒头节点         if (h != null && h.waitStatus != 0)             unparkSuccessor(h);         return true;     }     return false; } 

4.2.3.2. 尝试释放锁

  • Sync.tryRelease
 protected final boolean tryRelease(int releases) { 	//解锁     int c = getState() - releases; 	//加锁解锁必须同一个线程     if (Thread.currentThread() != getExclusiveOwnerThread())         throw new IllegalMonitorStateException();     boolean free = false;     if (c == 0) {         free = true;         setExclusiveOwnerThread(null);     }     //CAS设置解锁     setState(c);     return free; } 
4.2.3.2.1. 释放锁成功后唤醒阻塞队列中的后续节点
  • unparkSuccessor
private void unparkSuccessor(Node node) {         int ws = node.waitStatus;         //当前节点的状态<0,则把状态改为0         //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何         if (ws < 0)             compareAndSetWaitStatus(node, ws, 0);              //当前节点的下一个节点为空或者状态>0(即是取消状态)         Node s = node.next;         if (s == null || s.waitStatus > 0) {             s = null;             //那么从队尾开始往前遍历找到离当前节点最近的下一个状态<=0的节点(即非取消状态)             for (Node t = tail; t != null && t != node; t = t.prev)                 if (t.waitStatus <= 0)                     s = t;         }     	//唤醒下一个节点(非公平锁也这样?)         if (s != null)             LockSupport.unpark(s.thread);     } 

5. 参考

发表评论

评论已关闭。

相关文章