Java 多线程:锁(三)

Java 多线程:锁(三)

作者:Grey

原文地址:

博客园:Java 多线程:锁(三)

CSDN:Java 多线程:锁(三)

StampedLock

StampedLock其实是对读写锁的一种改进,它支持在读同时进行一个写操作,也就是说,它的性能将会比读写锁更快。

更通俗的讲就是在读锁没有释放的时候是可以获取到一个写锁,获取到写锁之后,读锁阻塞,这一点和读写锁一致,唯一的区别在于读写锁不支持在没有释放读锁的时候获取写锁

StampedLock 有三种模式:

  • 悲观读:允许多个线程获取悲观读锁。

  • 写锁:写锁和悲观读是互斥的。

  • 乐观读:无锁机制,类似于数据库中的乐观锁,它支持在不释放乐观读的时候是可以获取到一个写锁。

参考: 有没有比读写锁更快的锁?

示例代码:

悲观读 + 写锁:

package git.snippets.juc;  import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.StampedLock; import java.util.logging.Logger;  // 悲观读 + 写锁 public class StampedLockPessimistic {     private static final Logger log = Logger.getLogger(StampedLockPessimistic.class.getName());     private static final StampedLock lock = new StampedLock();     //缓存中存储的数据     private static final Map<String, String> mapCache = new HashMap<>();     //模拟数据库存储的数据     private static final Map<String, String> mapDb = new HashMap<>();      static {         mapDb.put("zhangsan", "你好,我是张三");         mapDb.put("sili", "你好,我是李四");     }      private static void getInfo(String name) {         //获取悲观读         long stamp = lock.readLock();         log.info("线程名:" + Thread.currentThread().getName() + " 获取了悲观读锁" + "    用户名:" + name);         try {             if ("zhangsan".equals(name)) {                 log.info("线程名:" + Thread.currentThread().getName() + " 休眠中" + "    用户名:" + name);                 Thread.sleep(3000);                 log.info("线程名:" + Thread.currentThread().getName() + " 休眠结束" + "    用户名:" + name);             }             String info = mapCache.get(name);             if (null != info) {                 log.info("在缓存中获取到了数据");                 return;             }         } catch (InterruptedException e) {             log.info("线程名:" + Thread.currentThread().getName() + " 释放了悲观读锁");             e.printStackTrace();         } finally {             //释放悲观读             lock.unlock(stamp);         }          //获取写锁         stamp = lock.writeLock();         log.info("线程名:" + Thread.currentThread().getName() + " 获取了写锁" + "    用户名:" + name);         try {             //判断一下缓存中是否被插入了数据             String info = mapCache.get(name);             if (null != info) {                 log.info("获取到了写锁,再次确认在缓存中获取到了数据");                 return;             }             //这里是往数据库获取数据             String infoByDb = mapDb.get(name);             //将数据插入缓存             mapCache.put(name, infoByDb);             log.info("缓存中没有数据,在数据库获取到了数据");         } finally {             //释放写锁             log.info("线程名:" + Thread.currentThread().getName() + " 释放了写锁" + "     用户名:" + name);             lock.unlock(stamp);         }     }      public static void main(String[] args) { //线程1         Thread t1 = new Thread(() -> {             getInfo("zhangsan");         });          //线程2         Thread t2 = new Thread(() -> {             getInfo("lisi");         });          //线程启动         t1.start();         t2.start();          //线程同步         try {             t1.join();             t2.join();         } catch (InterruptedException e) {             e.printStackTrace();         }     } }  

乐观读:

package git.snippets.juc;  import java.util.concurrent.locks.StampedLock; import java.util.logging.Logger;  // 乐观写 public class StampedLockOptimistic {     private static final Logger log = Logger.getLogger(StampedLockOptimistic.class.getName());     private static final StampedLock lock = new StampedLock();     private static int num1 = 1;     private static int num2 = 1;      /**      * 修改成员变量的值,+1      *      * @return      */     private static int sum() {         log.info("求和方法被执行了");         //获取乐观读         long stamp = lock.tryOptimisticRead();         int cnum1 = num1;         int cnum2 = num2;         log.info("获取到的成员变量值,cnum1:" + cnum1 + "   cnum2:" + cnum2);         try {             //休眠3秒,目的是为了让其他线程修改掉成员变量的值。             Thread.sleep(3000);         } catch (InterruptedException e) {             e.printStackTrace();         }         //判断在运行期间是否存在写操作   true:不存在   false:存在         if (!lock.validate(stamp)) {             log.info("存在写操作!");             //存在写锁             //升级悲观读锁             stamp = lock.readLock();             try {                 log.info("升级悲观读锁");                 cnum1 = num1;                 cnum2 = num2;                 log.info("重新获取了成员变量的值=========== cnum1=" + cnum1 + "    cnum2=" + cnum2);             } finally {                 //释放悲观读锁                 lock.unlock(stamp);             }         }         return cnum1 + cnum2;     }      //使用写锁修改成员变量的值     private static void updateNum() {         long stamp = lock.writeLock();         try {             num1 = 2;             num2 = 2;         } finally {             lock.unlock(stamp);         }     }       public static void main(String[] args) throws InterruptedException {         Thread t1 = new Thread(() -> {             int sum = sum();             log.info("求和结果:" + sum);         });         t1.start();         //休眠1秒,目的为了让线程t1能执行到获取成员变量之后         Thread.sleep(1000);         updateNum();         t1.join();         log.info("执行完毕");      }  }  

使用 StampedLock 的注意事项

  1. 看名字就能看出来StampedLock不支持重入锁。

  2. 它适用于读多写少的情况,如果不是这种情况,请慎用,性能可能还不如synchronized

  3. StampedLock的悲观读锁、写锁不支持条件变量。

  4. 千万不能中断阻塞的悲观读锁或写锁,如果调用阻塞线程的interrupt(),会导致cpu飙升,如果希望StampedLock支持中断操作,请使用readLockInterruptibly(悲观读锁)与writeLockInterruptibly(写锁)。

CountDownLatch

类似门闩的概念,可以替代join,但是比join灵活,因为一个线程里面可以多次countDown,但是join一定要等线程完成才能执行。

其底层原理是:调用await()方法的线程会利用AQS排队,一旦数字减为0,则会将AQS中排队的线程依次唤醒。

代码如下:

package git.snippets.juc;  import java.util.concurrent.CountDownLatch;  /**  * CountDownLatch可以用Join替代  */ public class CountDownLatchAndJoin {     public static void main(String[] args) {         useCountDownLatch();         useJoin();     }      public static void useCountDownLatch() {         // use countdownlatch         long start = System.currentTimeMillis();         Thread[] threads = new Thread[100000];         CountDownLatch latch = new CountDownLatch(threads.length);          for (int i = 0; i < threads.length; i++) {             threads[i] = new Thread(() -> {                 int result = 0;                 for (int i1 = 0; i1 < 1000; i1++) {                     result += i1;                 }                 // System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result);                 latch.countDown();             });         }         for (Thread thread : threads) {             thread.start();         }         try {             latch.await();         } catch (InterruptedException e) {             e.printStackTrace();         }         long end = System.currentTimeMillis();          System.out.println("end latch down, time is " + (end - start));      }      public static void useJoin() {         long start = System.currentTimeMillis();          // use join         Thread[] threads = new Thread[100000];          for (int i = 0; i < threads.length; i++) {             threads[i] = new Thread(() -> {                 int result = 0;                 for (int i1 = 0; i1 < 1000; i1++) {                     result += i1;                 }                 // System.out.println("Current thread " + Thread.currentThread().getName() + " finish cal result " + result);             });         }         for (Thread thread : threads) {             thread.start();         }         for (Thread thread : threads) {             try {                 thread.join();             } catch (InterruptedException e) {                 e.printStackTrace();             }         }          long end = System.currentTimeMillis();          System.out.println("end join, time is " + (end - start));     } }  

CyclicBarrier

类似栅栏,类比:满了20个乘客就发车 这样的场景。

比如:一个程序可能收集如下来源的数据:

  1. 数据库

  2. 网络

  3. 文件

程序可以并发执行,用线程操作1,2,3,然后操作完毕后再合并, 然后执行后续的逻辑操作,就可以使用CyclicBarrier

代码如下:

package git.snippets.juc;  import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;  /**  * CyclicBarrier示例:满员发车  *  * @author <a href="mailto:410486047@qq.com">Grey</a>  * @since 1.8  */ public class CyclicBarrierTest {     public static void main(String[] args) {         CyclicBarrier barrier = new CyclicBarrier(20, () -> {             System.out.println("满了20,发车");         });         for (int i = 0; i < 100; i++) {             new Thread(() -> {                 try {                     barrier.await();                 } catch (InterruptedException | BrokenBarrierException e) {                     e.printStackTrace();                 }             }).start();         }     } } 

Semaphore

表示信号量,有如下两个操作:

s.acquire() 信号量减1

s.release()信号量加1

到 0 以后,就不能执行了,这个可以用于限流

底层原理是:如果没有线程许可可用,则线程阻塞,并通过 AQS 来排队,可以通过release()方法来释放许可,当某个线程释放了某个许可后,会从 AQS 中正在排队的第一个线程依次开始唤醒,直到没有空闲许可。

Semaphore 使用示例:有N个线程来访问,我需要限制同时运行的只有信号量大小的线程数。

代码如下:

package git.snippets.juc;  import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;  /**  * Semaphore用于限流  */ public class SemaphoreUsage {     public static void main(String[] args) {         Semaphore semaphore = new Semaphore(1);         new Thread(() -> {             try {                 semaphore.acquire();                 TimeUnit.SECONDS.sleep(2);                 System.out.println("Thread 1 executed");             } catch (Exception e) {                 e.printStackTrace();             } finally {                 semaphore.release();             }         }).start();          new Thread(() -> {             try {                 semaphore.acquire();                 TimeUnit.SECONDS.sleep(2);                 System.out.println("Thread 2 executed");             } catch (Exception e) {                 e.printStackTrace();             } finally {                 semaphore.release();             }         }).start();     } } 

Semaphore可以有公平非公平的方式进行配置。

SemaphoreCountDownLatch的区别?

Semaphore 是信号量,可以做限流,限制 n 个线程并发,释放一个线程后就又能进来一个新的线程。

CountDownLatch 是闭锁,带有阻塞的功能,必须等到 n 个线程都执行完后,被阻塞的线程才能继续往下执行。

Guava RateLimiter

采用令牌桶算法,用于限流

示例代码如下

package git.snippets.juc;  import com.google.common.util.concurrent.RateLimiter; import java.util.List; import java.util.concurrent.Executor;  /**  * @author <a href="mailto:410486047@qq.com">Grey</a>  * @date 2021/4/21  * @since  */ public class RateLimiterUsage {     //每秒只发出2个令牌     static final RateLimiter rateLimiter = RateLimiter.create(2.0);     static void submitTasks(List<Runnable> tasks, Executor executor) {         for (Runnable task : tasks) {             rateLimiter.acquire(); // 也许需要等待             executor.execute(task);         }     } }  

注:上述代码需要引入 Guava 包

Phaser(Since jdk1.7)

遗传算法,可以用这个结婚的场景模拟: 假设婚礼的宾客有 5 个人,加上新郎和新娘,一共 7 个人。 我们可以把这 7 个人看成 7 个线程,有如下步骤要执行。

  1. 到达婚礼现场

  2. 吃饭

  3. 离开

  4. 拥抱(只有新郎和新娘线程可以执行)

每个阶段执行完毕后才能执行下一个阶段,其中拥抱阶段只有新郎新娘这两个线程才能执行。

以上需求,我们可以通过 Phaser 来实现,具体代码和注释如下:

package git.snippets.juc;  import java.util.Random; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit;  public class PhaserUsage {     static final Random R = new Random();     static WeddingPhaser phaser = new WeddingPhaser();      static void millSleep() {         try {             TimeUnit.MILLISECONDS.sleep(R.nextInt(1000));         } catch (InterruptedException e) {             e.printStackTrace();         }     }      public static void main(String[] args) {         // 宾客的人数         final int guestNum = 5;         // 新郎和新娘         final int mainNum = 2;         phaser.bulkRegister(mainNum + guestNum);         for (int i = 0; i < guestNum; i++) {             new Thread(new Person("宾客" + i)).start();         }         new Thread(new Person("新娘")).start();         new Thread(new Person("新郎")).start();     }      static class WeddingPhaser extends Phaser {         @Override         protected boolean onAdvance(int phase, int registeredParties) {             switch (phase) {                 case 0:                     System.out.println("所有人到齐");                     return false;                 case 1:                     System.out.println("所有人吃饭");                     return false;                 case 2:                     System.out.println("所有人离开");                     return false;                 case 3:                     System.out.println("新郎新娘拥抱");                     return true;                 default:                     return true;             }         }     }      static class Person implements Runnable {         String name;          Person(String name) {             this.name = name;         }          @Override         public void run() {             // 先到达婚礼现场             arrive();             // 吃饭             eat();             // 离开             leave();             // 拥抱,只保留新郎和新娘两个线程可以执行             hug();         }          private void arrive() {             millSleep();             System.out.println("name:" + name + " 到来");             phaser.arriveAndAwaitAdvance();         }          private void eat() {             millSleep();             System.out.println("name:" + name + " 吃饭");             phaser.arriveAndAwaitAdvance();         }          private void leave() {             millSleep();             System.out.println("name:" + name + " 离开");             phaser.arriveAndAwaitAdvance();         }          private void hug() {             if ("新娘".equals(name) || "新郎".equals(name)) {                 millSleep();                 System.out.println("新娘新郎拥抱");                 phaser.arriveAndAwaitAdvance();             } else {                 phaser.arriveAndDeregister();             }         }     } } 

Exchanger

用于线程之间交换数据,exchange()方法是阻塞的,所以要两个exchange行为都执行到才会触发交换。

package git.snippets.juc;  import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit;  /**  * Exchanger用于两个线程之间交换变量  */ public class ExchangerUsage {     static Exchanger<String> semaphore = new Exchanger<>();      public static void main(String[] args) {          new Thread(() -> {             String s = "T1";             try {                 s = semaphore.exchange(s);                 TimeUnit.SECONDS.sleep(2);                 System.out.println("Thread 1(T1) executed, Result is " + s);             } catch (Exception e) {                 e.printStackTrace();             }         }).start();          new Thread(() -> {             String s = "T2";             try {                 s = semaphore.exchange(s);                 TimeUnit.SECONDS.sleep(2);                 System.out.println("Thread 2(T2) executed, Result is " + s);             } catch (Exception e) {                 e.printStackTrace();             }         }).start();     } } 

LockSupport

其他锁的底层用的是AQS

原先让线程等待需要wait/await,现在仅需要LockSupport.park()

原先叫醒线程需要notify/notifyAll,现在仅需要LockSupport.unpark(), LockSupport.unpark()还可以叫醒指定线程,

示例代码:

package git.snippets.juc;  import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport;  /**  * 阻塞指定线程,唤醒指定线程  */ public class LockSupportUsage {     public static void main(String[] args) {         Thread t = new Thread(() -> {             for (int i = 0; i < 10; i++) {                 try {                     if (i == 5) {                         LockSupport.park();                     }                     if (i == 8) {                         LockSupport.park();                     }                     TimeUnit.SECONDS.sleep(1);                     System.out.println(i);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         });         t.start();         // unpark可以先于park调用         //LockSupport.unpark(t);         try {             TimeUnit.SECONDS.sleep(8);         } catch (InterruptedException e) {             e.printStackTrace();         }          LockSupport.unpark(t);         System.out.println("after 8 seconds");     } } 

实现一个监控元素的容器

实现一个容器,提供两个方法

// 向容器中增加一个元素 void add(T t); // 返回容器大小 int size(); 

有两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束

方法 1. 使用wait + notify实现

方法 2. 使用CountDownLatch实现

方法 3. 使用LockSupport实现

代码如下:

package git.snippets.juc;  import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport;  // 实现一个容器,提供两个方法,add,size,有两个线程, // 线程1添加10个元素到容器中, // 线程2实现监控元素的个数, // 当个数到5个时,线程2给出提示并结束 public class MonitorContainer {      public static void main(String[] args) {          useLockSupport();         // useCountDownLatch();         // useNotifyAndWait();     }       /**      * 使用LockSupport      */     private static void useLockSupport() {         System.out.println("use LockSupport...");         Thread adder;         List<Object> list = Collections.synchronizedList(new ArrayList<>());         Thread finalMonitor = new Thread(() -> {             LockSupport.park();             if (match(list)) {                 System.out.println("filled 5 elements size is " + list.size());                 LockSupport.unpark(null);             }         });         adder = new Thread(() -> {             for (int i = 0; i < 10; i++) {                 increment(list);                 if (match(list)) {                     LockSupport.unpark(finalMonitor);                 }             }         });         adder.start();         finalMonitor.start();     }      /**      * 使用CountDownLatch      */     private static void useCountDownLatch() {         System.out.println("use CountDownLatch...");         List<Object> list = Collections.synchronizedList(new ArrayList<>());         CountDownLatch latch = new CountDownLatch(5);         Thread adder = new Thread(() -> {             for (int i = 0; i < 10; i++) {                 increment(list);                 if (i <= 4) {                     latch.countDown();                 }             }         });         Thread monitor = new Thread(() -> {             try {                 latch.await();             } catch (InterruptedException e) {                 e.printStackTrace();             }             if (match(list)) {                 System.out.println("filled 5 elements");             }         });         adder.start();         monitor.start();     }      /**      * notify + wait 实现      */     private static void useNotifyAndWait() {         System.out.println("use notify and wait...");         List<Object> list = Collections.synchronizedList(new ArrayList<>());         final Object o = new Object();         Thread adder = new Thread(() -> {             synchronized (o) {                 for (int i = 0; i < 10; i++) {                     increment(list);                     if (match(list)) {                         o.notify();                         try {                             o.wait();                         } catch (InterruptedException e) {                             e.printStackTrace();                         }                     }                 }                 System.out.println("add finished");                 o.notify();             }         });         Thread monitor = new Thread(() -> {             synchronized (o) {                 if (match(list)) {                     System.out.println("5 elements added " + list.size());                     o.notify();                     try {                         o.wait();                         System.out.println("monitor finished");                         o.notify();                     } catch (InterruptedException e) {                         e.printStackTrace();                     }                  }             }         });         adder.start();         monitor.start();     }      /**      * 只要是5的倍数,就循环打印      */     private static void useNotifyAndWaitLoop() {         List<Object> list = Collections.synchronizedList(new ArrayList<>());         final Object o = new Object();         Thread adder = new Thread(() -> {             synchronized (o) {                 for (; ; ) {                     increment(list);                     if (match(list)) {                         o.notify();                         try {                             o.wait();                         } catch (InterruptedException e) {                             e.printStackTrace();                         }                     }                 }             }         });         Thread monitor = new Thread(() -> {             synchronized (o) {                 while (true) {                     if (match(list)) {                         System.out.println("filled 5 elements");                     }                     o.notify();                     try {                         o.wait();                     } catch (InterruptedException e) {                         e.printStackTrace();                     }                 }             }         });         adder.start();         monitor.start();     }      private static void increment(List<Object> list) {         try {             TimeUnit.SECONDS.sleep(1);         } catch (InterruptedException e1) {             e1.printStackTrace();         }         list.add(new Object());         System.out.println("list add the ele, size is " + list.size());     }      private static boolean match(List<Object> list) {         return list.size() % 5 == 0;     } } 

生产者消费者问题

写一个固定容量的同步容器,拥有putget方法,以及getCount方法,能够支持 2 个生产者线程以及 10 个消费者线程的阻塞调用。

方法 1. 使用wait/notifyAll

方法 2. ReentrantLockCondition,本质就是等待队列

package git.snippets.juc;  import java.util.LinkedList; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;  // 写一个固定容量的同步容器,拥有put和get方法,以及getCount方法,能够支持2个生产者线程以及10个消费者线程的阻塞调用。 public class ProducerAndConsumer {     public static void main(String[] args) {         // MyContainerByCondition container = new MyContainerByCondition(100);         MyContainerByNotifyAndWait container = new MyContainerByNotifyAndWait(100);         for (int i = 0; i < 25; i++) {             new Thread(container::get).start();         }         for (int i = 0; i < 20; i++) {             new Thread(() -> container.put(new Object())).start();         }     } }  // 使用ReentrantLock的Condition class MyContainerByCondition {     static ReentrantLock lock = new ReentrantLock();     final int MAX;     private final LinkedList<Object> list = new LinkedList<>();     Condition consumer = lock.newCondition();     Condition producer = lock.newCondition();      public MyContainerByCondition(int limit) {         this.MAX = limit;     }      public void put(Object object) {         lock.lock();         try {             while (getCount() == MAX) {                 System.out.println("container is full");                 try {                     producer.await();                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }             list.add(object);             consumer.signalAll();             System.out.println("contain add a object, current size " + getCount());          } finally {             lock.unlock();         }      }      public Object get() {         lock.lock();         try {             while (getCount() == 0) {                 try {                     System.out.println("container is empty");                     consumer.await();                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }             Object object = list.removeFirst();             producer.signalAll();              System.out.println("contain get a object, current size " + getCount());             return object;         } finally {             lock.unlock();         }      }      public synchronized int getCount() {         return list.size();     } }  // 使用synchronized的wait和notifyAll class MyContainerByNotifyAndWait {     LinkedList<Object> list = null;     final int limit;      MyContainerByNotifyAndWait(int limit) {         this.limit = limit;         list = new LinkedList<>();     }      synchronized int getCount() {         return list.size();     }      // index 从0开始计数     synchronized Object get() {         while (list.size() == 0) {             System.out.println("container is empty");             try {                 this.wait();             } catch (InterruptedException e) {                 e.printStackTrace();             }         }         Object o = list.removeFirst();          System.out.println("get a data");         this.notifyAll();         return o;     }      synchronized void put(Object data) {         while (list.size() > limit) {             System.out.println("container is full , do not add any more");             try {                 this.wait();             } catch (InterruptedException e) {                 e.printStackTrace();             }         }         list.add(data);          System.out.println("add a data");         this.notifyAll();     } } 

说明

本文涉及到的所有代码和图例

图例

代码

更多内容见:Java 多线程

参考资料

实战Java高并发程序设计(第2版)

深入浅出Java多线程

多线程与高并发-马士兵

Java并发编程实战

Java中的共享锁和排他锁(以读写锁ReentrantReadWriteLock为例)

【并发编程】面试官:有没有比读写锁更快的锁?

图解Java多线程设计模式

发表评论

评论已关闭。

相关文章

当前内容话题