Java并发(二十三)—-同步模式之保护性暂停

1、定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject

  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列

  • JDK 中,join 的实现、Future 的实现,采用的就是此模式

  • 因为要等待另一方的结果,因此归类到同步模式

Java并发(二十三)----同步模式之保护性暂停

2、实现

class GuardedObject { ​     // 结果     private Object response;     private final Object lock = new Object(); ​     // 获取结果     public Object get() {         synchronized (lock) {             // 条件不满足则等待             while (response == null) {                 try {                     lock.wait();                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }             return response;         }     } ​     // 产生结果     public void complete(Object response) {         synchronized (lock) {             // 条件满足,通知等待线程             this.response = response;             lock.notifyAll();         }     } }

3、应用

一个线程等待另一个线程的执行结果

public static void main(String[] args) {     GuardedObject guardedObject = new GuardedObject();     new Thread(() -> {         try {             // 子线程执行下载             List<String> response = download(); // 模拟下载操作             log.debug("download complete...");             guardedObject.complete(response);         } catch (IOException e) {             e.printStackTrace();         }     }).start(); ​     log.debug("waiting...");     // 主线程阻塞等待     Object response = guardedObject.get();     log.debug("get response: [{}] lines", ((List<String>) response).size()); ​ }

执行结果

08:42:18.568 [main] c.TestGuardedObject - waiting... 08:42:23.312 [Thread-0] c.TestGuardedObject - download complete... 08:42:23.312 [main] c.TestGuardedObject - get response: [3] lines

4、带超时版 GuardedObject

如果要控制超时时间呢

class GuardedObjectV2 { ​     private Object response;     private final Object lock = new Object(); ​     public Object get(long millis) {         synchronized (lock) {             // 1) 记录最初时间             long begin = System.currentTimeMillis();             // 2) 已经经历的时间             long timePassed = 0;             while (response == null) {                 // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等                 long waitTime = millis - timePassed;                 log.debug("waitTime: {}", waitTime);                 if (waitTime <= 0) {                     log.debug("break...");                     break;                 }                 try {                     lock.wait(waitTime);  // 注意这里并不是 mills,防止虚假唤醒                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 // 3) 如果提前被唤醒,这时已经经历的时间假设为 400                 timePassed = System.currentTimeMillis() - begin;                 log.debug("timePassed: {}, object is null {}",                            timePassed, response == null);             }             return response;         }     } ​     public void complete(Object response) {         synchronized (lock) {             // 条件满足,通知等待线程             this.response = response;             log.debug("notify...");             lock.notifyAll();         }     } }

测试,没有超时

public static void main(String[] args) {     GuardedObjectV2 v2 = new GuardedObjectV2();     new Thread(() -> {         sleep(1); // 睡眠1秒         v2.complete(null);         sleep(1);         v2.complete(Arrays.asList("a", "b", "c"));     }).start(); ​     Object response = v2.get(2500);     if (response != null) {         log.debug("get response: [{}] lines", ((List<String>) response).size());     } else {         log.debug("can't get response");     } }

输出

08:49:39.917 [main] c.GuardedObjectV2 - waitTime: 2500 08:49:40.917 [Thread-0] c.GuardedObjectV2 - notify... 08:49:40.917 [main] c.GuardedObjectV2 - timePassed: 1003, object is null true 08:49:40.917 [main] c.GuardedObjectV2 - waitTime: 1497 08:49:41.918 [Thread-0] c.GuardedObjectV2 - notify... 08:49:41.918 [main] c.GuardedObjectV2 - timePassed: 2004, object is null false 08:49:41.918 [main] c.TestGuardedObjectV2 - get response: [3] lines

测试,超时

// 等待时间不足 List<String> lines = v2.get(1500);

输出

08:47:54.963 [main] c.GuardedObjectV2 - waitTime: 1500 08:47:55.963 [Thread-0] c.GuardedObjectV2 - notify... 08:47:55.963 [main] c.GuardedObjectV2 - timePassed: 1002, object is null true 08:47:55.963 [main] c.GuardedObjectV2 - waitTime: 498 08:47:56.461 [main] c.GuardedObjectV2 - timePassed: 1500, object is null true 08:47:56.461 [main] c.GuardedObjectV2 - waitTime: 0 08:47:56.461 [main] c.GuardedObjectV2 - break... 08:47:56.461 [main] c.TestGuardedObjectV2 - can't get response 08:47:56.963 [Thread-0] c.GuardedObjectV2 - notify...

5、多任务版 GuardedObject

图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员

如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理

Java并发(二十三)----同步模式之保护性暂停

新增 id 用来标识 Guarded Object

class GuardedObject { ​     // 标识 Guarded Object     private int id; ​     public GuardedObject(int id) {         this.id = id;     } ​     public int getId() {         return id;     } ​     // 结果     private Object response; ​     // 获取结果     // timeout 表示要等待多久 2000     public Object get(long timeout) {         synchronized (this) {             // 开始时间 15:00:00             long begin = System.currentTimeMillis();             // 经历的时间             long passedTime = 0;             while (response == null) {                 // 这一轮循环应该等待的时间                 long waitTime = timeout - passedTime;                 // 经历的时间超过了最大等待时间时,退出循环                 if (timeout - passedTime <= 0) {                     break;                 }                 try {                     this.wait(waitTime); // 虚假唤醒 15:00:01                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 // 求得经历时间                 passedTime = System.currentTimeMillis() - begin; // 15:00:02  1s             }             return response;         }     } ​     // 产生结果     public void complete(Object response) {         synchronized (this) {             // 给结果成员变量赋值             this.response = response;             this.notifyAll();         }     } }

中间解耦类

class Mailboxes {     private static Map<Integer, GuardedObject> boxes = new Hashtable<>(); ​     private static int id = 1;     // 产生唯一 id     private static synchronized int generateId() {         return id++;     } ​     public static GuardedObject getGuardedObject(int id) {         return boxes.remove(id);  // 注意这里的remove,防止堆溢出     } ​     public static GuardedObject createGuardedObject() {         GuardedObject go = new GuardedObject(generateId());         boxes.put(go.getId(), go);         return go;     } ​     public static Set<Integer> getIds() {         return boxes.keySet();     } }

业务相关类

class People extends Thread{     @Override     public void run() {         // 收信         GuardedObject guardedObject = Mailboxes.createGuardedObject();         log.debug("开始收信 id:{}", guardedObject.getId());         Object mail = guardedObject.get(5000);         log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);     } }
class Postman extends Thread {     private int id;     private String mail; ​     public Postman(int id, String mail) {         this.id = id;         this.mail = mail;     } ​     @Override     public void run() {         GuardedObject guardedObject = Mailboxes.getGuardedObject(id);         log.debug("送信 id:{}, 内容:{}", id, mail);         guardedObject.complete(mail);     } }

测试

public static void main(String[] args) throws InterruptedException {     for (int i = 0; i < 3; i++) {         new People().start();     }     Sleeper.sleep(1);// 睡眠1秒     for (Integer id : Mailboxes.getIds()) {         new Postman(id, "内容" + id).start();     } }

某次运行结果

10:35:05.689 c.People [Thread-1] - 开始收信 id:3 10:35:05.689 c.People [Thread-2] - 开始收信 id:1 10:35:05.689 c.People [Thread-0] - 开始收信 id:2 10:35:06.688 c.Postman [Thread-4] - 送信 id:2, 内容:内容2 10:35:06.688 c.Postman [Thread-5] - 送信 id:1, 内容:内容1 10:35:06.688 c.People [Thread-0] - 收到信 id:2, 内容:内容2 10:35:06.688 c.People [Thread-2] - 收到信 id:1, 内容:内容1 10:35:06.688 c.Postman [Thread-3] - 送信 id:3, 内容:内容3 10:35:06.689 c.People [Thread-1] - 收到信 id:3, 内容:内容3

 

发表评论

相关文章