JUC并发—13.Future模式和异步编程简介

大纲

1.Runnable接口与Callable接口

(1)Runnable接口实现异步任务

(2)Callable接口实现异步任务

2.Future模式

(1)Future模式的概念

(2)Future接口的使用

(3)FutureTask类的使用

3.CompletableFuture的使用和异步编程

(1)使用Future时的问题

(2)CompletableFuture的使用例子

(3)CompletableFuture的使用场景

(4)CompletableFuture的创建异步任务

(5)CompletableFuture的简单任务异步回调

(6)CompletableFuture的多个任务组合处理

(7)CompletableFuture的使用注意事项

 

1.Runnable接口与Callable接口

(1)Runnable接口实现异步任务

(2)Callable接口实现异步任务

 

(1)Runnable接口实现异步任务

也就是通过创建实现了Runnable接口的Thread线程来实现异步任务。

 

Runnable接口实现的异步任务存在的问题:

一.Runnable接口不支持获取返回值

二.Runnable接口不支持抛出异常

@FunctionalInterface public interface Runnable {     public abstract void run(); }  public class Thread implements Runnable {     ...     private Runnable target;          public Thread() {         init(null, null, "Thread-" + nextThreadNum(), 0);     }          @Override     public void run() {         if (target != null) {             target.run();         }     }     ... }  @RunWith(SpringRunner.class) @SpringBootTest public class Test {     @Test     public void testNewThread() {         Thread t1 = new Thread(new Runnable() {             @Override             public void run() {                 System.out.println("实现的异步任务");             }         });         t1.start();     } }

(2)Callable接口实现异步任务

Callable接口需要与Future和ExecutorService结合使用:通过ExecutorService的submit()方法提交一个实现Callable接口的任务,然后ExecutorService的submit()方法会返回一个实现Future接口的对象,接着调用Future接口的get()方法就可以获取异步任务的结果。

public interface ExecutorService extends Executor {     ...     //Submits a value-returning task for execution and returns a Future representing the pending results of the task.      //The Future's get method will return the task's result upon successful completion.     //@param task the task to submit     //@param <T> the type of the task's result     //@return a Future representing pending completion of the task     <T> Future<T> submit(Callable<T> task);     ... }  public interface Future<V> {     ...     //Waits if necessary for the computation to complete, and then retrieves its result.     V get() throws InterruptedException, ExecutionException;          //Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available.     V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;     ... }

 

2.Future模式

(1)Future模式的概念

(2)Future接口的使用

(3)FutureTask类的使用

 

(1)Future模式的概念

当前线程有一个任务,提交给了Future,由Future来完成这个任务,在此期间当前线程可以处理其他事情了。一段时间后,当前线程就可以从Future中获取结果。

 

(2)Future接口的使用

一.Future接口源码

二.普通模式计算1000次1到1亿的和

三.Future模式计算1000次1到1亿的和

 

一.Future接口源码

Future就是对实现Runnable或Callable接口的任务进行查询、中断、获取。

public interface Future<V> {     //用来取消任务,取消成功则返回true,取消失败则返回false     //mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,设为true,则表示可以取消正在执行过程中的任务     //如果任务已完成,则无论mayInterruptIfRunning为true还是false,此方法都返回false,即如果取消已经完成的任务会返回false     //如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false     //如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true     boolean cancel(boolean mayInterruptIfRunning);      //表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true     boolean isCancelled();      //表示任务是否已经完成,若任务完成,则返回true     boolean isDone();      //获取执行结果,如果最终结果还没得出该方法会产生阻塞,直到任务执行完毕返回结果     V get() throws InterruptedException, ExecutionException;      //获取执行结果,如果在指定时间内,还没获取到结果,则抛出TimeoutException     V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

二.普通模式计算1000次1到1亿的和

public class NormalTest {     //普通模式计算1000次1到1亿的和     public static void main(String[] args) {         long start = System.currentTimeMillis();         List<Integer> retList = new ArrayList<>();         //计算1000次1至1亿的和         for(int i = 0; i < 1000; i++) {             retList.add(Calc.cal(100000000));         }         System.out.println("耗时: " + (System.currentTimeMillis() - start));         for (int i = 0; i < 1000; i++) {             try {                 Integer result = retList.get(i);                 System.out.println("第" + i + "个结果: " + result);             } catch (Exception e) {                       }         }         System.out.println("耗时: " + (System.currentTimeMillis() - start));     }      public static class Calc implements Callable<Integer> {         @Override         public Integer call() throws Exception {             return cal(10000);         }         public static int cal (int num) {             int sum = 0;             for (int i = 0; i < num; i++) {                 sum += i;             }             return sum;         }     } }  -------------------------------------------------- 执行结果: 耗时: 43659 第0个结果: 887459712 第1个结果: 887459712 第2个结果: 887459712 ... 第999个结果: 887459712 耗时: 43688

三.Future模式计算1000次1到1亿的和

public class FutureTest {     //Future模式计算1000次1到1亿的和     public static void main(String[] args) {         long start = System.currentTimeMillis();         ExecutorService executorService = Executors.newCachedThreadPool();         List<Future<Integer>> futureList = new ArrayList<>();         //计算1000次1至1亿的和         for (int i = 0; i < 1000; i++) {             //调度执行             futureList.add(executorService.submit(new Calc()));         }         System.out.println("耗时: " + (System.currentTimeMillis() - start));         for (int i = 0; i < 1000; i++) {             try {                 Integer result = futureList.get(i).get();                 System.out.println("第" + i + "个结果: " + result);             } catch (InterruptedException | ExecutionException e) {             }         }         System.out.println("耗时: " + (System.currentTimeMillis() - start));     }      public static class Calc implements Callable<Integer> {         @Override         public Integer call() throws Exception {             return cal(100000000);         }          public static int cal (int num) {             int sum = 0;             for (int i = 0; i < num; i++) {                 sum += i;             }             return sum;         }     } }  -------------------------------------------------- 执行结果: 耗时: 12058 第0个结果: 887459712 第1个结果: 887459712 ... 第999个结果: 887459712 耗时: 12405

(3)FutureTask类的使用

一.FutureTask类的简介

二.Callable + Future获取异步任务的执行结果

三.Callable + FutureTask获取异步任务的结果

 

一.FutureTask类的简介

FutureTask类实现了RunnableFuture接口,而RunnableFuture接口又继承了Runnable接口和Future接口。所以FutureTask类既可以作为Runnable被线程执行,又可以作为Future得到Callable的run()方法的返回值。同时,FutureTask类是Future接口的唯一实现类。

public class FutureTask<V> implements RunnableFuture<V> {     ...     ... }  public interface RunnableFuture<V> extends Runnable, Future<V> {     void run(); }

二.Callable + Future获取异步任务的执行结果

//Callable+Future获取执行结果 public class FutureTest {     public static void main(String[] args) {     		ExecutorService executor = Executors.newCachedThreadPool();         Task task = new Task();         Future<Integer> result = executor.submit(task);         executor.shutdown();         try {             Thread.sleep(1000);         } catch (InterruptedException e1) {             e1.printStackTrace();         }         System.out.println("主线程在执行任务");         try {             System.out.println("task运行结果" + result.get());         } catch (InterruptedException | ExecutionException e) {             e.printStackTrace();         }         System.out.println("所有任务执行完毕");     } }  class Task implements Callable<Integer> {     @Override     public Integer call() throws Exception {         System.out.println("子线程在进行计算");         Thread.sleep(3000);         int sum = 0;         for (int i = 0; i < 100; i++) {             sum += i;         }         return sum;     } }

三.Callable + FutureTask获取异步任务的结果

//Callable + FutureTask获取执行结果 public class FutureTest {     public static void main(String[] args) {         //第一种方式         ExecutorService executor = Executors.newCachedThreadPool();         Task task = new Task();         FutureTask<Integer> futureTask = new FutureTask<>(task);         executor.submit(futureTask);         executor.shutdown();          //第二种方式         //注意这种方式和第一种方式效果是类似的,只不过之前使用的是ExecutorService,现在使用的是Thread         //Task task = new Task();         //FutureTask<Integer> futureTask = new FutureTask<Integer>(task);         //Thread thread = new Thread(futureTask);         //thread.start();          try {             Thread.sleep(1000);         } catch (InterruptedException e1) {             e1.printStackTrace();         }         System.out.println("主线程在执行任务");         try {             System.out.println("task运行结果" + futureTask.get());         } catch (InterruptedException | ExecutionException e) {             e.printStackTrace();         }         System.out.println("所有任务执行完毕");     } }  class Task implements Callable<Integer>{     @Override     public Integer call() throws Exception {         System.out.println("子线程在进行计算");         Thread.sleep(3000);         int sum = 0;         for(int i=0;i<100;i++)             sum += i;         return sum;     } }

 

3.CompletableFuture的使用和异步编程

(1)使用Future时的问题

(2)CompletableFuture的使用例子

(3)CompletableFuture的使用场景

(4)CompletableFuture的创建异步任务

(5)CompletableFuture的简单任务异步回调

(6)CompletableFuture的多个任务组合处理

(7)CompletableFuture的使用注意事项

 

(1)使用Future时的问题

一.通过Future获取结果演示

如果主线程需要执行一个很耗时的计算任务,那么可通过Future把该任务放到异步线程执行,让主线程继续处理其他任务。当这个耗时的任务处理完成后,再让主线程通过Future获取计算结果。

 

如下所示,有两个服务:

public class UserInfoService {     public UserInfo getUserInfo(Long userId) throws InterruptedException {         Thread.sleep(300);//模拟调用耗时         return new UserInfo("...");//一般是查数据库,或者远程调用返回     } }  public class OrderService {     public OrderInfo getOrderInfo(long userId) throws InterruptedException {         Thread.sleep(500);//模拟调用耗时         return new OrderInfo("...");     } }

接下来,演示在主线程中是如何使用Future来进行异步调用的。

public class FutureTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         ExecutorService executorService = Executors.newFixedThreadPool(10);         UserInfoService userInfoService = new UserInfoService();         OrderService orderService = new OrderService();         long userId = 1L;         long startTime = System.currentTimeMillis();                  //调用用户服务获取用户基本信息         FutureTask<UserInfo> userInfoFutureTask = new FutureTask<>(new Callable<UserInfo>() {             @Override             public UserInfo call() throws Exception {                 return userInfoService.getUserInfo(userId);             }         });         //提交任务给线程池异步执行         executorService.submit(userInfoFutureTask);                  //模拟主线程其它操作耗时         Thread.sleep(300);                  //调用订单服务获取用户订单信息         FutureTask<OrderInfo> orderInfoFutureTask = new FutureTask<>(new Callable<OrderInfo>() {             @Override             public MedalInfo call() throws Exception {                 return medalService.getMedalInfo(userId);             }         });         //提交任务给线程池异步执行         executorService.submit(medalInfoFutureTask);                  UserInfo userInfo = userInfoFutureTask.get();//获取用户信息结果         OrderInfo orderInfo = orderInfoFutureTask.get();//获取订单信息结果         System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");     } } //总共用时806ms

如果不使用Future进行并行异步调用,而是在主线程串行执行,那么耗时大约为300 + 500 + 300 = 1100ms。

 

二.Future获取结果时存在的问题

可见,Future + 线程池异步配合,提高了程序的执行效率。但是由于根据Future获取结果的方式不是很友好,所以只能通过阻塞或者轮询的方式来得到任务的结果。

 

方式一:

通过Future提供的get()方法,进行阻塞调用。在主线程获取到异步任务的执行结果前,get()方法会一直阻塞。

 

方式二:

通过Future提供的isDone()方法,进行轮询调用。可以让主线程在程序中轮询isDone()方法来查询异步任务的执行结果。

 

阻塞的方式会违背异步编程的理念,轮询的方式又会空耗CPU资源,因此JDK8设计出了CompletableFuture。

 

CompletableFuture提供了一种类似观察者模式的机制,可以让异步任务执行完成后通知主线程。

 

三.使用Future的问题总结

首先需要单独创建一个线程池来提交Callable任务。然后如果使用Future的get()方法获取结果,那么需要进行阻塞调用。如果使用Future的isDone()方法获取结果,那么需要进行轮询调用。

 

(2)CompletableFuture的使用例子

如下所示,使用CompletableFuture,代码简洁了很多。CompletableFuture的supplyAsync()方法,提供了异步执行的功能,线程池也不用单独创建了,实际上使用的默认线程池是ForkJoinPool.commonPool。

public class CompletableFutureTest {     public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {         UserInfoService userInfoService = new UserInfoService();         OrderService orderService = new OrderService();         long userId = 1L;         long startTime = System.currentTimeMillis();                  //调用用户服务获取用户基本信息         CompletableFuture<UserInfo> completableUserInfoFuture =             CompletableFuture.supplyAsync(() -> userInfoService.getUserInfo(userId));                  //模拟主线程其它操作耗时         Thread.sleep(300);                  //调用订单服务获取用户订单信息         CompletableFuture<OrderInfo> completableOrderInfoFuture =              CompletableFuture.supplyAsync(() -> orderService.getOrderInfo(userId));          //获取个人信息结果         UserInfo userInfo = completableUserInfoFuture.get();         //获取订单信息结果         OrderInfo orderInfo = completableOrderInfoFuture.get();         System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");     } }

(3)CompletableFuture的使用场景

一.创建异步任务

二.简单任务异步回调

三.多个任务组合处理

 

(4)CompletableFuture的创建异步任务

CompletableFuture创建异步任务的方法:supplyAsync()和runAsync();

JUC并发—13.Future模式和异步编程简介

一.supplyAsync()方法执行CompletableFuture任务,有返回值。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {     ...     //Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool#commonPool()      //with the value obtained by calling the given Supplier.     //@param supplier a function returning the value to be used to complete the returned CompletableFuture     //@param <U> the function's return type     //@return the new CompletableFuture     //使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {         return asyncSupplyStage(asyncPool, supplier);     }      //Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor      //with the value obtained by calling the given Supplier.     //@param supplier a function returning the value to be used to complete the returned CompletableFuture     //@param executor the executor to use for asynchronous execution     //@param <U> the function's return type     //@return the new CompletableFuture     //使用自定义的线程池,根据supplier构建执行任务     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {         return asyncSupplyStage(screenExecutor(executor), supplier);     }     ... }

二.runAsync()方法执行CompletableFuture任务,没有返回值。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {     ...     //Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool#commonPool()      //after it runs the given action.     //@param runnable the action to run before completing the returned CompletableFuture     //@return the new CompletableFuture     //使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务     public static CompletableFuture<Void> runAsync(Runnable runnable) {         return asyncRunStage(asyncPool, runnable);     }      //Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor      //after it runs the given action.     //@param runnable the action to run before completing the returned CompletableFuture     //@param executor the executor to use for asynchronous execution     //@return the new CompletableFuture     //使用自定义的线程池,根据runnable构建执行任务     public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {         return asyncRunStage(screenExecutor(executor), runnable);     }     ... }

使用示例如下:

public class CompletableFuture {     public static void main(String[] args) {         //自定义线程池         ExecutorService executor = Executors.newCachedThreadPool();                  //runAsync的使用,没有返回值         CompletableFuture<Void> runFuture =             CompletableFuture.runAsync(() -> System.out.println("没有返回值"), executor);         //supplyAsync的使用,有返回值         CompletableFuture<String> supplyFuture =             CompletableFuture.supplyAsync(() -> { System.out.print("有返回值"); return "OK"; }, executor);                  //runAsync的future没有返回值,输出null         System.out.println(runFuture.join());         //supplyAsync的future,有返回值         System.out.println(supplyFuture.join());                  //关闭线程池         executor.shutdown();     } }

(5)CompletableFuture的简单任务异步回调

JUC并发—13.Future模式和异步编程简介

一.thenRun()和thenRunAsync()方法

CompletableFuture的thenRun()方法就是:执行完第一个任务后,再执行第二个任务。也就是当某个任务执行完成后,会执行设置给该任务的回调方法。但是前后两个任务没有传递参数,第二个任务也没有返回值。

 

thenRun()和thenRunAsync()方法的区别是:如果执行第一个任务时,传入了一个自定义线程池。当调用thenRun()方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。当调用thenRunAsync()方法执行第二个任务时,则第一个任务使用传入的线程池,第二个任务使用ForkJoin线程池。也就是说,thenRunAsync()会使用ForkJoin线程池来异步执行任务。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {     private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();     ...     public CompletableFuture<Void> thenRun(Runnable action) {         return uniRunStage(null, action);     }     public CompletableFuture<Void> thenRunAsync(Runnable action) {         return uniRunStage(asyncPool, action);     }     ... }  public class FutureThenRunTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {             System.out.println("先执行第一个任务");             return "第一个任务执行完成";         });         CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {             System.out.println("接着执行第二个任务");         });         System.out.println("输出" + thenRunFuture.get());     } }  //执行程序输出的结果如下: //先执行第一个任务 //接着执行第二个任务 //输出null

二.thenAccept()和thenAcceptAsync()方法

CompletableFuture的thenAccept()方法表示:第一个任务执行完成后,执行第二个任务(回调方法)时,会将第一个任务的执行结果作为入参,传递到第二个任务中,但是第二个任务是没有返回值的。

 

CompletableFuture的thenAcceptAsync()方法会使用ForkJoin线程池来异步执行任务。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {     private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();     ...     public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {         return uniAcceptStage(null, action);     }     public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {         return uniAcceptStage(asyncPool, action);     }     ... }  public class FutureThenAcceptTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {             System.out.println("执行第一个任务");             return "第一个任务的返回值";         });         CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {             System.out.println("执行第二个任务");             if ("第一个任务的返回值".equals(a)) {                 System.out.println("收到传入的第一个任务的返回值");             }         });         System.out.println("输出" + thenAcceptFuture.get());     } }  //执行程序输出的结果如下: //执行第一个任务 //执行第二个任务 //收到传入的第一个任务的返回值 //输出null

三.thenApply()和thenApplyAsync()方法

CompletableFuture的thenApply()方法表示:第一个任务执行完成后,执行第二个任务(回调方法)时,会将第一个任务的执行结果作为入参,传递到第二个任务中,并且第二个任务是有返回值的。

 

CompletableFuture的thenApplyAsync()方法会使用ForkJoin线程池来异步执行任务。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {     private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();     ...     public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {         return uniApplyStage(null, fn);     }     public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {         return uniApplyStage(asyncPool, fn);     }     ... }  public class FutureThenApplyTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {             System.out.println("执行第一个任务");             return "第一个任务的返回值";         });         CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {             if ("第一个任务的返回值".equals(a)) {                 System.out.println("收到传入的第一个任务的返回值");                 System.out.println("执行第二个任务");                 return "第二个任务的返回值";             }             return "第二个任务的返回值";         });         System.out.println("输出" + thenApplyFuture.get());     } }  //执行程序输出的结果如下: //执行第一个任务 //收到传入的第一个任务的返回值 //执行第二个任务 //输出第二个任务的返回值

四.exceptionally()方法

CompletableFuture的exceptionally()方法表示:某个任务执行异常时,才执行的回调方法。并且将抛出的异常作为参数,传递到回调方法中。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {     ...     public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {         return uniExceptionallyStage(fn);     }     ... }  public class FutureExceptionTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {             System.out.println("异步执行任务时抛出异常");             throw new RuntimeException();         });         CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {             e.printStackTrace();             return "返回处理异步执行任务抛出的异常的结果";         });         System.out.println(exceptionFuture.get());     } }  //执行程序输出的结果如下: //异步执行任务时抛出异常 //java.util.concurrent.CompletionException: java.lang.RuntimeException //返回处理异步执行任务抛出的异常的结果

五.whenComplete()和whenCompleteAsync()

CompletableFuture的whenComplete()方法表示:某个任务执行完成后,紧接着执行的回调方法无返回值。whenComplete()方法返回的CompletableFuture的result是上个任务的结果。

 

CompletableFuture的whenCompleteAsync()方法会使用ForkJoin线程池来异步执行回调方法。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {     private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();     ...          public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) {         return uniWhenCompleteStage(null, action);     }      public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) {         return uniWhenCompleteStage(asyncPool, action);     }     ... }  public class FutureWhenTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {             System.out.println("异步执行第一个任务");             try {                 Thread.sleep(2000L);             } catch (InterruptedException e) {                 e.printStackTrace();             }             return "第一个任务的返回值";         });         CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> {             System.out.println("异步执行第二个任务");             if ("第一个任务的返回值".equals(a)) {                 System.out.println("收到传入的第一个任务的返回值");             }             System.out.println("whenComplete()执行的回调方法没有返回值");         });         System.out.println("输出" + rstFuture.get());     } }  //执行程序输出的结果如下: //异步执行第一个任务 //异步执行第二个任务 //收到传入的第一个任务的返回值 //whenComplete()执行的回调方法没有返回值 //输出第一个任务的返回值

六.handle()和handleAsync()方法

CompletableFuture的handle()方法表示:异步任务执行完成后,紧接着执行的回调方法是有返回值的。handle()方法返回的CompletableFuture的result是回调方法执行的结果。

 

CompletableFuture的handleAsync()方法会使用ForkJoin线程池来异步执行回调方法。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {     private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();     ...          public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {         return uniHandleStage(null, fn);     }      public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) {         return uniHandleStage(asyncPool, fn);     }     ... }  public class FutureHandlerTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {             System.out.println("");             try {                 Thread.sleep(2000L);             } catch (InterruptedException e) {                 e.printStackTrace();             }             return "第一个任务的返回值";         });         CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> {             System.out.println("执行第二个任务");             if ("第一个任务的返回值".equals(a)) {                 System.out.println("收到传入的第一个任务的返回值");                 return "第二个任务的返回值";             }             return "第二个任务的返回值";         });         System.out.println("输出" + rstFuture.get());     } }  //执行程序输出的结果如下: //执行第二个任务 //收到传入的第一个任务的返回值 //输出第二个任务的返回值

(6)CompletableFuture的多个任务组合处理

一.AND组合关系

thenCombine()、thenAcceptBoth()、runAfterBoth()都表示:将两个CompletableFuture任务组合起来,只有这两个任务都正常执行完后,才会执行后面的回调方法。区别如下:

 

thenCombine()方法会将两个任务的执行结果作为方法入参,传递到指定的回调方法中,且指定的回调方法有返回值。

 

thenAcceptBoth()方法会将两个任务的执行结果作为方法入参,传递到指定的回调方法中,但指定的回调方法无返回值。

 

runAfterBoth()方法则不会把两个任务的执行结果当做方法入参,传递到指定的回调方法中,且指定的回调方法没有返回值。

public class ThenCombineTest {     public static void main(String[] args) throws Exception {         CompletableFuture<String> firstFuture = CompletableFuture.supplyAsync(() -> {             System.out.println("第一个异步任务要执行3秒");             try {                 Thread.sleep(3000L);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("第一个异步任务执行完毕");             return "第一个任务的返回值";         });         ExecutorService executor = Executors.newFixedThreadPool(2);         CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {             System.out.println("第二个异步任务要执行2秒");             try {                 Thread.sleep(2000L);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("第二个异步任务执行完毕");             return "第二个任务的返回值";         }, executor).thenCombineAsync(firstFuture, (secondResult, firstResult) -> {             System.out.println("两个异步任务都执行完毕后才能执行这里");             System.out.println("接收到" + firstResult);             System.out.println("接收到" + secondResult);             return "两个异步任务都执行完后执行的回调的返回值";         }, executor);         System.out.println("输出" + future.join());         executor.shutdown();     } }  //执行程序输出的结果如下: //第一个异步任务要执行3秒 //第二个异步任务要执行2秒 //第二个异步任务执行完毕 //第一个异步任务执行完毕 //两个异步任务都执行完毕后才能执行这里 //接收到第一个任务的返回值 //接收到第二个任务的返回值 //输出两个异步任务都执行完后执行的回调的返回值

二.OR组合关系

applyToEither()、acceptEither()、runAfterEither()都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。区别如下:

 

applyToEither()方法会将已经执行完成的任务的结果,作为方法入参,传递到指定的回调方法中,且指定的回调方法有返回值。

 

acceptEither()方法会将已经执行完成的任务的结果,作为方法入参,传递到指定的回调方法中,且指定的回调方法无返回值。

 

runAfterEither()方法不会把已经执行完成的任务的结果当做方法入参,传递到指定的回调方法中,且指定的回调方法没有返回值。

public class AcceptEitherTest {     public static void main(String[] args) {         //第一个异步任务,休眠2秒,保证它执行晚点         CompletableFuture<String> first = CompletableFuture.supplyAsync(() -> {             try{                 Thread.sleep(2000L);                 System.out.println("执行完第一个任务");             } catch (Exception e) {                 return "执行第一个任务异常";             }             return "返回第一个任务的结果";         });         ExecutorService executor = Executors.newSingleThreadExecutor();         //第二个异步任务         CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {             System.out.println("执行完第二个任务");             return "返回第二个任务的结果";         }, executor).acceptEitherAsync(first, (lastResult) -> {             System.out.println("执行完第一个或第二个任务后的回调");             System.out.println("获取到传入的先执行完的任务的返回结果是:" + lastResult);         }, executor);         executor.shutdown();     } }  //执行程序输出的结果如下: //执行完第二个任务 //执行完第一个或第二个任务后的回调 //获取到传入的先执行完的任务的返回结果是:返回第二个任务的结果

三.anyOf

任意一个任务执行完,就执行anyOf()方法返回的CompletableFuture。如果执行的任务异常,anyOf()方法返回的CompletableFuture在执行get()方法时,会抛出异常。

public class AnyOfFutureTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {             try {                 Thread.sleep(3000L);             } catch (InterruptedException e) {                 e.printStackTrace();             }             System.out.println("任务A执行完了");         });         CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {             System.out.println("任务B执行完了");         });         CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m, k) -> {             System.out.println("finish");         });         anyOfFuture.join();     } }  //执行程序输出的结果如下: //任务B执行完了 //finish

四.allOf

所有任务都执行完成后,才执行allOf()方法返回的CompletableFuture。如果任意一个任务异常,allOf()方法返回的CompletableFuture在执行get()方法时,会抛出异常。

public class allOfFutureTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         CompletableFuture<Void> a = CompletableFuture.runAsync(() -> {             System.out.println("任务A执行完了");         });         CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {             System.out.println("任务B执行完了");         });         CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m, k) -> {             System.out.println("finish:" + m + "," + k);         });     } }  //执行程序输出的结果如下: //任务A执行完了 //任务B执行完了 //finish: null,null

五.thenCompose

thenCompose()方法会在某个任务执行完成后,将该任务的执行结果作为方法入参去执行指定的方法。thenCompose()方法会返回一个新的CompletableFuture实例。

public class ThenComposeTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         CompletableFuture<String> f = CompletableFuture.completedFuture("第一个任务");         ExecutorService executor = Executors.newSingleThreadExecutor();         CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {             System.out.println("执行第二个任务");             return "返回第二个任务的结果";         }, executor).thenComposeAsync(data -> {             System.out.println("执行第三个任务");             System.out.println("收到传入的:" + data);             return f;         }, executor);         System.out.println(future.join());         executor.shutdown();     } }  //执行程序输出的结果如下: //执行第二个任务 //执行第三个任务 //收到传入的:返回第二个任务的结果 //第一个任务

(7)CompletableFuture的使用注意事项

JUC并发—13.Future模式和异步编程简介

一.Future需要获取返回值,才能获取异常信息

public class ThenComposeTest {     public static void main(String[] args) throws ExecutionException, InterruptedException {         ExecutorService executorService = newThreadPoolExecutor(             5,             10,             5L,             TimeUnit.SECONDS,             new ArrayBlockingQueue<>(10)         );         CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {             int a = 0;             int b = 666;             int c = b / a;             return true;         },executorService).thenAccept((a) -> {             System.out.println(a);         });          //如果如下这一行get()方法,是看不到异常信息的         //future.get();     } }

二.CompletableFuture的get()方法是阻塞的

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。

 

三.默认线程池的注意点

CompletableFuture代码中使用了默认的线程池,处理的线程个数是机器CPU核数 - 1。在大量请求过来时,如果处理逻辑复杂,那么响应就会很慢。所以一般建议使用自定义线程池,优化线程池配置参数。

 

四.自定义线程池时注意饱和策略

由于CompletableFuture的get()方法是阻塞的,所以一般建议使用类似future.get(3, TimeUnit.SECONDS),并且一般建议使用自定义线程池。

 

但如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,那么当线程池饱和时,会直接丢弃任务,不会抛出异常。

 

因此建议CompletableFuture线程池的拒绝策略最好使用AbortPolicy,然后对耗时的异步线程做好线程池隔离。

 

发表评论

评论已关闭。

相关文章

当前内容话题
  • 0