简单设计一个JAVA并行处理工具类

在工作中,我们肯定遇到过一个接口要处理N多事项导致接口响应速度很慢的情况,通常我们会综合使用两种方式来提升接口响应速度

  1. 优化查询SQL,提升查询效率
  2. 开启多线程并发处理业务数据

这里讨论第二种方案:使用多线程并发处理业务数据,最后处理完成以后,拼装起来返回给前端,每个人的实现方案都不一样,我在工作的这几年也经历了几种写法。

一、几种常见的并行处理写法

方法一:Future写法

其代码形式如下

@Test public void test1() {     //定义线程池     ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 30,                     TimeUnit.SECONDS,                     new ArrayBlockingQueue<>(10),                     Executors.defaultThreadFactory(),                     new ThreadPoolExecutor.DiscardPolicy());     //异步执行     Future<String> getUserName = threadPoolExecutor.submit(() -> {         //do something...         return "kdyzm";     });     //异步执行     Future<Integer> getUserAge = threadPoolExecutor.submit(() -> {         //do something...         return 12;     });     //拼装回调结果     try {         UserInfo user = new UserInfo();         user.setName(getUserName.get());         user.setAge(getUserAge.get());         log.info(JsonUtils.toPrettyString(user));     } catch (InterruptedException | ExecutionException e) {         e.printStackTrace();     } }  @Data static class UserInfo {     private String name;     private Integer age; } 

多几个submit一起执行,最后集中get获取最终结果。

这种方式任务一旦多了,就会显得代码很乱,一堆的变量名会让代码可读性很差。

方法二:CompletableFuture.allOf写法

其代码形式如下

@Test public void test2() {     try {         UserInfo userInfo = new UserInfo();                  CompletableFuture.allOf(             	//异步执行                 CompletableFuture.runAsync(() -> {                     userInfo.setName("kdyzm");                 }),             	//异步执行                 CompletableFuture.runAsync(() -> {                     userInfo.setAge(12);                 })         //同步返回         ).get();          log.info(JsonUtils.toPrettyString(userInfo));     } catch (InterruptedException | ExecutionException e) {         e.printStackTrace();     } }  @Data static class UserInfo {     private String name;     private Integer age; } 

这种方法使用了CompletableFuture的API,通过将多个异步任务收集起来统一调度最后通过一个get方法同步到主线程。比直接使用Future简化了些。

方法三:CompletableFuture::join写法

其代码形式如下

@Test public void test3(){     UserInfo userInfo = new UserInfo();     Arrays.asList( 			//异步执行             CompletableFuture.supplyAsync(()->{                 return "kdyzm";             //回调执行             }).thenAccept(name->{                 userInfo.setName(name);             }),          	//异步执行             CompletableFuture.supplyAsync(()->{                 return 12;             //回调执行             }).thenAccept(age->{                 userInfo.setAge(age);             })                  //等待所有线程执行完毕     ).forEach(CompletableFuture::join);      log.info(JsonUtils.toPrettyString(userInfo));  }  @Data static class UserInfo {     private String name;     private Integer age; } 

这种写法和上面的写法相比具有更高的可读性,但是它也有缺点:thenAccept只能接收一个返回值,如果想处理多个值,则没有办法,只能使用方法2。

总结

几种写法中第二、三种写法比较常见,使用起来也更加方便,两者各有优缺点:方法2能处理多个返回值,方法3可读性更高。但是无论是方法2还是方法3,它们的使用总是要记住相关的API,使用起来总不是很顺手,可读性虽然方法3更强一些,但是总还是差点意思。此时我就有了自己设计一个简单的并行处理工具类的想法,既要易用,还要可读性高。

二、并行处理工具类设计

1、设计模式选型

因为平时比较喜欢链式调用的API,所以一开始一开始设计,我就想用建造者模式来实现这个工具类。关于建造者模式,详情可以看我之前的文章:设计模式(六):建造者模式 。建造者模式在实际应用中的特点就是链式调用,无论是StringBuilder还是lombok的@Data注解,都使用了建造者模式。

2、第一版代码

仿照方法三,我开发了第一版代码

import lombok.Data; import lombok.extern.slf4j.Slf4j;  import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Supplier;  /**  * @author kdyzm  */ @Slf4j public class ConcurrentWorker {      private List<Task> workers = new ArrayList<>();      public static ConcurrentWorker runner() {         return new ConcurrentWorker();     }      public <R> ConcurrentWorker addTask(Consumer<? super R> action, Supplier<R> value) {         Task<R> worker = new Task<>(action, value);         this.workers.add(worker);         return this;     }      public void run() {         workers.forEach(item -> {             CompletableFuture completableFuture = CompletableFuture.supplyAsync(item.getValue());             item.setCompletableFuture(completableFuture);         });         workers                 .stream()                 .map(                         item -> {                             return item.completableFuture.thenAccept(item.getAction());                         }                 )                 .forEach(CompletableFuture::join);     }      @Data     public static class Task<R> {         private Consumer<? super R> action;         private Supplier<R> value;         private CompletableFuture<R> completableFuture;          public Task(Consumer<? super R> action, Supplier<R> value) {             this.action = action;             this.value = value;         }     } } 

这段代码一共不到60行,使用了Lambda表达式和函数式编程相关的API对方法三进行改造,最终使用效果如下

@Test     public void test() {          UserInfo userInfo = new UserInfo();          ConcurrentWorker.runner()             	//添加任务                 .addTask(userInfo::setName, () -> {                     //延迟1000毫秒打印线程执行情况                     try {                         Thread.sleep(1000);                     } catch (InterruptedException e) {                         e.printStackTrace();                     }                     log.info(Thread.currentThread().getName()+"-name");                     return "张三";                 })             	//添加任务                 .addTask(userInfo::setAge, () -> {                     //延迟1000毫秒打印线程执行情况                     try {                         Thread.sleep(1000);                     } catch (InterruptedException e) {                         e.printStackTrace();                     }                     log.info(Thread.currentThread().getName()+"-age");                     return 13;                 })             	//执行任务                 .run();         log.info(JsonUtils.toPrettyString(userInfo));     }      @Data     static class UserInfo {         private String name;         private Integer age;         private String sex;     } 

它的使用方式就是

ConcurrentWorker.runner()                 .addTask(setter function, return_value function )     			.addTask(setter function, return_value function)     			.run() 

可以看到易用性够了,可读性也很好,但是它的缺点和方法三一样,都只能接收一个参数,毕竟它是根据方法3封装的,接下来改造代码让它支持多参数处理。

3、第二版代码

已知,第一版代码已经支持了如下形式的功能

ConcurrentWorker.runner()                 .addTask(setter function, return_value function )     			.addTask(setter function, return_value function)     			.run() 

现在我想添加以下形式的重载方法

.addTask(handle function) 

没错,就一个参数,在这个方法中可以任意设置对象值。最终使用的效果如下

@Test public void test() {      UserInfo userInfo = new UserInfo();      ConcurrentWorker.runner()             .addTask(userInfo::setName, () -> {                 try {                     Thread.sleep(1000);                     log.info(Thread.currentThread().getName());                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 log.info(Thread.currentThread().getName()+"-name");                 return "张三";             })             .addTask(userInfo::setAge, () -> {                 try {                     Thread.sleep(1000);                     log.info(Thread.currentThread().getName());                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 log.info(Thread.currentThread().getName()+"-age");                 return 13;             })         	//新方法:处理任意多属性值填充             .addTask(()->{                 try {                     Thread.sleep(1000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }                 log.info(Thread.currentThread().getName()+"-sex");                 userInfo.setSex("男");             })             .run();     log.info(JsonUtils.toPrettyString(userInfo)); }  @Data static class UserInfo {     private String name;     private Integer age;     private String sex; } 

完整工具类方法如下

import lombok.Data; import lombok.extern.slf4j.Slf4j;  import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Supplier;  /**  * @author kdyzm  */ @Slf4j public class ConcurrentWorker {      private List<Task> workers = new ArrayList<>();      public static ConcurrentWorker runner() {         return new ConcurrentWorker();     }      public <R> ConcurrentWorker addTask(Consumer<? super R> action, Supplier<R> value) {         Task<R> worker = new Task<>(action, value);         this.workers.add(worker);         return this;     }      public <R> ConcurrentWorker addTask(Runnable runnable) {         Task<R> worker = new Task<>(runnable);         this.workers.add(worker);         return this;     }      public void run() {         workers.forEach(item -> {             int taskType = item.getTaskType();             CompletableFuture completableFuture = null;             switch (taskType) {                 case TaskType.RETURN_VALUE:                     completableFuture = CompletableFuture.supplyAsync(item.getValue());                     break;                 case TaskType.VOID_RETURN:                     completableFuture = CompletableFuture.runAsync(item.getRunnable());                     break;                 default:                     break;             }             item.setCompletableFuture(completableFuture);         });         workers                 .stream()                 .map(                         item -> {                             int taskType = item.getTaskType();                             switch (taskType) {                                 case TaskType.RETURN_VALUE:                                     return item.completableFuture.thenAccept(item.getAction());                                 default:                                     return item.completableFuture.thenAccept(temp->{                                         //空                                     });                             }                         }                 )                 .forEach(CompletableFuture::join);     }      @Data     public static class Task<R> {         private Consumer<? super R> action;         private Supplier<R> value;         private CompletableFuture<R> completableFuture;         private Runnable runnable;         private int taskType;          public Task(Consumer<? super R> action, Supplier<R> value) {             this.action = action;             this.value = value;             this.taskType = TaskType.RETURN_VALUE;         }           public Task(Runnable runnable) {             this.runnable = runnable;             this.taskType = TaskType.VOID_RETURN;         }     }       public static class TaskType {          /**          * 有返回值的          */         public static final int RETURN_VALUE = 1;          /**          * 没有返回值的          */         public static final int VOID_RETURN = 2;     } } 

我将任务类型分为两种,并使用TaskType类封装成常量值:1表示任务执行回调有返回值;2表示任务执行没有返回值,属性填充将在任务执行过程中完成,该类型任务使用Runnable接口实现。

4、工具类jar包

相关代码我已经打包成jar包上传到maven中央仓库,可以通过引入以下maven依赖使用ConcurrentWorker工具类

<dependency>     <groupId>cn.kdyzm</groupId>     <artifactId>kdyzm-util</artifactId>     <version>0.0.2</version> </dependency> 

最后,欢迎关注我的博客:https://blog.kdyzm.cn

END.

发表评论

评论已关闭。

相关文章