//A cancellable asynchronous computation. //This class provides a base implementation of Future, with methods to start and cancel a computation, //query to see if the computation is complete, and retrieve the result of the computation. //The result can only be retrieved when the computation has completed; //the get methods will block if the computation has not yet completed. //Once the computation has completed, the computation cannot be restarted or cancelled //(unless the computation is invoked using #runAndReset). //A FutureTask can be used to wrap a Callable or Runnable object. //Because FutureTask implements Runnable, //a FutureTask can be submitted to an Executor for execution. //In addition to serving as a standalone class, //this class provides protected functionality that may be useful when creating customized task classes. public class FutureTask<V> implements RunnableFuture<V> { ... //Creates a FutureTask that will, upon running, execute the given Callable. public FutureTask(Callable<V> callable) { if (callable == null) { throw new NullPointerException(); } this.callable = callable; this.state = NEW;//ensure visibility of callable } ... } //A Future that is Runnable. //Successful execution of the run method causes completion of the Future and allows access to its results. public interface RunnableFuture<V> extends Runnable, Future<V> { //Sets this Future to the result of its computation unless it has been cancelled. void run(); } //A Future represents the result of an asynchronous computation. //Methods are provided to check if the computation is complete, //to wait for its completion, and to retrieve the result of the computation. //The result can only be retrieved using method get when the computation has completed, //blocking if necessary until it is ready. //Cancellation is performed by the cancel method. //Additional methods are provided to determine if the task completed normally or was cancelled. //Once a computation has completed, the computation cannot be cancelled. //If you would like to use a Future for the sake of cancellability but not provide a usable result, //you can declare types of the form Future<?> and return null as a result of the underlying task. public interface Future<V> { //用来取消任务,取消成功则返回true,取消失败则返回false //mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,设为true,则表示可以取消正在执行过程中的任务 //如果任务已完成,则无论mayInterruptIfRunning为true还是false,此方法都返回false,即如果取消已经完成的任务会返回false //如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false //如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true boolean cancel(boolean mayInterruptIfRunning); //表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true boolean isCancelled(); //表示任务是否已经完成,若任务完成,则返回true boolean isDone(); //获取执行结果,如果最终结果还没得出该方法会产生阻塞,直到任务执行完毕返回结果 V get() throws InterruptedException, ExecutionException; //获取执行结果,如果在指定时间内,还没获取到结果,则抛出TimeoutException V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } @FunctionalInterface public interface Runnable { public abstract void run(); }
public class FutureTask<V> implements RunnableFuture<V> { //The run state of this task, initially NEW. //The run state transitions to a terminal state only in methods set, setException, and cancel. //During completion, state may take on transient values of COMPLETING (while outcome is being set) //or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)). //Transitions from these intermediate to final states use cheaper ordered/lazy writes //because values are unique and cannot be further modified. //Possible state transitions: //NEW(初始状态) -> COMPLETING(正在设置任务结果) -> NORMAL,这是任务正常执行完毕时状态的变更流程 //NEW(初始状态) -> COMPLETING(正在设置任务结果) -> EXCEPTIONAL,这是任务执行异常时状态的变更流程 //NEW(初始状态) -> CANCELLED(任务被取消),这是调用了Future.cancel()方法 //NEW(初始状态) -> INTERRUPTING(正在中断执行任务的线程) -> INTERRUPTED(任务被中断) //代表任务在运行过程中的状态(7种) private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; //The underlying callable; nulled out after running //当前要执行的任务 private Callable<V> callable; //The result to return or exception to throw from get() //任务的执行结果,通过Future.get()获取的值 private Object outcome; //The thread running the callable; CASed during run() //当前执行callable任务的线程 private volatile Thread runner; //Treiber stack of waiting threads //用来保存所有等待任务执行结束的线程的单向链表 private volatile WaitNode waiters; ... }
public class FutureTask<V> implements RunnableFuture<V> { ... //代表任务在运行过程中的状态(7种) private volatile int state; //当前要执行的任务 private Callable<V> callable; //任务的执行结果,通过Future.get()获取的值 private Object outcome; //当前执行callable任务的线程 private volatile Thread runner; //用来保存所有等待任务执行结束的线程的单向链表 private volatile WaitNode waiters; private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } } public void run() { //首先判断当前状态是否为NEW,并使用CAS把runner属性设置为当前线程 //如果当前状态不是NEW或者CAS设置失败,说明已经有其他线程正在执行当前任务了,于是直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) { return; } try { //获取通过构造方法传入的Callable接口的实现类实例callable Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //然后调用callable中的call()方法获得执行结果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) { //调用set()方法把执行结果保存到outcome属性中 set(result); } } } finally { //runner must be non-null until state is settled to prevent concurrent calls to run() runner = null; //state must be re-read after nulling runner to prevent leaked interrupts int s = state; if (s >= INTERRUPTING) { handlePossibleCancellationInterrupt(s); } } } //Sets the result of this future to the given value unless this future has already been set or has been cancelled. //This method is invoked internally by the run method upon successful completion of the computation. protected void set(V v) { //CAS修改任务状态为COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //把调用call()方法获取到的结果保存到outcome outcome = v; //CAS修改任务状态为NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } ... }
//A Future that may be explicitly completed (setting its value and status), //and may be used as a CompletionStage, //supporting dependent functions and actions that trigger upon its completion. public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { ... //Returns a new CompletableFuture that is asynchronously completed //by a task running in the ForkJoinPool#commonPool() //with the value obtained by calling the given Supplier. //@param supplier a function returning the value to be used to complete the returned CompletableFuture //@param <U> the function's return type //@return the new CompletableFuture //带有返回值的异步执行方法,传入一个函数式接口,返回一个新的CompletableFuture对象 //默认使用ForkJoinPool.commonPool()作为线程池执行异步任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } //Returns a new CompletableFuture that is asynchronously completed //by a task running in the given executor with the value obtained by calling the given Supplier. //@param supplier a function returning the value to be used to complete the returned CompletableFuture //@param executor the executor to use for asynchronous execution //@param <U> the function's return type //@return the new CompletableFuture //带有返回值的异步执行方法,传入一个函数式接口 + 一个线程池,返回一个新的CompletableFuture对象 //多了一个Executor参数,表示使用自定义线程池来执行任务 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; } //Returns a new CompletableFuture that is asynchronously completed //by a task running in the ForkJoinPool#commonPool() after it runs the given action. //@param runnable the action to run before completing the returned CompletableFuture //@return the new CompletableFuture //不带返回值的异步执行方法,传入一个Runnable参数,返回一个新的CompletableFuture对象 //默认使用ForkJoinPool.commonPool()作为线程池执行异步任务 public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } //Returns a new CompletableFuture that is asynchronously completed //by a task running in the given executor after it runs the given action. //@param runnable the action to run before completing the returned CompletableFuture //@param executor the executor to use for asynchronous execution //@return the new CompletableFuture //不带返回值的异步执行方法,传入一个Runnable参数 + 一个线程池,返回一个新的CompletableFuture对象 //多了一个Executor参数,表示使用自定义线程池来执行任务 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); } static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); e.execute(new AsyncRun(d, f)); return d; } ... }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, when this stage completes normally, //is executed with this stage's result as the argument to the supplied action. //@param action the action to perform before completing the returned CompletionStage //@return the new CompletionStage public CompletionStage<Void> thenAccept(Consumer<? super T> action); //Returns a new CompletionStage that, when this stage completes normally, //is executed using this stage's default asynchronous execution facility, //with this stage's result as the argument to the supplied action. //@param action the action to perform before completing the returned CompletionStage //@return the new CompletionStage public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); //Returns a new CompletionStage that, when this stage completes normally, //is executed using the supplied Executor, //with this stage's result as the argument to the supplied action. //@param action the action to perform before completing the returned CompletionStage //@param executor the executor to use for asynchronous execution //@return the new CompletionStage public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor); ... } public class CompletionStageExample { //当cf实例的任务执行完成后,会回调传入thenAcceptAsync()方法中的回调函数 //其中回调函数的result表示cf异步任务的返回结果 public static void main(String[] args) throws IOException, ExecutionException, InterruptedException { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "thenAccept message"); cf.thenAcceptAsync((result) -> { System.out.println(Thread.currentThread().getName() + "第一个异步任务的返回值:" + result); }); } }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, //when this and the other given stage both complete normally, //is executed with the two results as arguments to the supplied action. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@param <U> the type of the other CompletionStage's result //@return the new CompletionStage public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); //Returns a new CompletionStage that, //when this and the other given stage complete normally, //is executed using this stage's default asynchronous execution facility, //with the two results as arguments to the supplied action. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@param <U> the type of the other CompletionStage's result //@return the new CompletionStage public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action); //Returns a new CompletionStage that, //when this and the other given stage complete normally, //is executed using the supplied executor, //with the two results as arguments to the supplied function. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@param executor the executor to use for asynchronous execution //@param <U> the type of the other CompletionStage's result //@return the new CompletionStage public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor); ... } public class ThenAcceptBothExample { //task1和task2都执行完成后,会得到两个任务的返回值AcceptBoth和message, //接着开始执行thenAcceptBoth()中的action, //这个action会接收前面两个任务的执行结果r1和r2,并最终打印出:执行结果为"AcceptBoth+message" public static void main(String[] args) { CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "AcceptBoth"); CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "message"); task1.thenAcceptBoth(task2, (r1, r2) -> { System.out.println("执行结果" + r1 + "+" + r2); }); //或者采用Fluent风格来写 //CompletableFuture.supplyAsync(() -> "AcceptBoth").thenAcceptBoth( // CompletableFuture.supplyAsync(() -> "message"), (r1, r2) -> { // System.out.println("执行结果:" + r1 + ", " + r2); // } //); } }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, //when either this or the other given stage complete normally, //is executed with the corresponding result as argument to the supplied action. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@return the new CompletionStage public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action); //Returns a new CompletionStage that, //when either this or the other given stage complete normally, //is executed using this stage's default asynchronous execution facility, //with the corresponding result as argument to the supplied action. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@return the new CompletionStage public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action); //Returns a new CompletionStage that, //when either this or the other given stage complete normally, //is executed using the supplied executor, //with the corresponding result as argument to the supplied function. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@param executor the executor to use for asynchronous execution //@return the new CompletionStage public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor); ... }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, when this stage completes normally, //is executed with this stage's result as the argument to the supplied function. //@param fn the function to use to compute the value of the returned CompletionStage //@param <U> the function's return type //@return the new CompletionStage public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); //Returns a new CompletionStage that, when this stage completes normally, //is executed using this stage's default asynchronous execution facility, //with this stage's result as the argument to the supplied function. //@param fn the function to use to compute the value of the returned CompletionStage //@param <U> the function's return type //@return the new CompletionStage public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); //Returns a new CompletionStage that, when this stage completes normally, //is executed using the supplied Executor, //with this stage's result as the argument to the supplied function. //@param fn the function to use to compute the value of the returned CompletionStage //@param executor the executor to use for asynchronous execution //@param <U> the function's return type //@return the new CompletionStage public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor); ... }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, //when this and the other given stage both complete normally, //is executed with the two results as arguments to the supplied function. //@param other the other CompletionStage //@param fn the function to use to compute the value of the returned CompletionStage //@param <U> the type of the other CompletionStage's result //@param <V> the function's return type //@return the new CompletionStage public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); //Returns a new CompletionStage that, //when this and the other given stage complete normally, //is executed using this stage's default asynchronous execution facility, //with the two results as arguments to the supplied function. //@param other the other CompletionStage //@param fn the function to use to compute the value of the returned CompletionStage //@param <U> the type of the other CompletionStage's result //@param <V> the function's return type //@return the new CompletionStage public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn); //Returns a new CompletionStage that, //when this and the other given stage complete normally, //is executed using the supplied executor, //with the two results as arguments to the supplied function. //@param other the other CompletionStage //@param fn the function to use to compute the value of the returned CompletionStage //@param executor the executor to use for asynchronous execution //@param <U> the type of the other CompletionStage's result //@param <V> the function's return type //@return the new CompletionStage public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor); ... } public class ThenCombineExample { public static void main(String[] args) { CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Combine"); CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "message"); CompletableFuture<String> cf = task1.thenCombineAsync(task2, (r1, r2) -> { System.out.println("执行结果:" + r1 + ", " + r2); return r1 + r2; }); System.out.println(cf.get()); //或者采用Fluent风格来写 //CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Combine").thenCombineAsync( // CompletableFuture.supplyAsync(() -> "message"), (r1, r2) -> { // System.out.println("执行结果:" + r1 + ", " + r2); // return r1 + r2; // } //); //System.out.println(cf.get()); } }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, //when either this or the other given stage complete normally, //is executed with the corresponding result as argument to the supplied function. //@param other the other CompletionStage //@param fn the function to use to compute the value of the returned CompletionStage //@param <U> the function's return type //@return the new CompletionStage public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn); //Returns a new CompletionStage that, //when either this or the other given stage complete normally, //is executed using this stage's default asynchronous execution facility, //with the corresponding result as argument to the supplied function. //@param other the other CompletionStage //@param fn the function to use to compute the value of the returned CompletionStage //@param <U> the function's return type //@return the new CompletionStage public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn); //Returns a new CompletionStage that, //when either this or the other given stage complete normally, //is executed using the supplied executor, //with the corresponding result as argument to the supplied function. //@param other the other CompletionStage //@param fn the function to use to compute the value of the returned CompletionStage //@param executor the executor to use for asynchronous execution //@param <U> the function's return type //@return the new CompletionStage public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor); ... }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, //when this stage completes normally, executes the given action. //@param action the action to perform before completing the returned CompletionStage //@return the new CompletionStage public CompletionStage<Void> thenRun(Runnable action); //Returns a new CompletionStage that, when this stage completes normally, //executes the given action using this stage's default asynchronous execution facility. //@param action the action to perform before completing the returned CompletionStage //@return the new CompletionStage public CompletionStage<Void> thenRunAsync(Runnable action); //Returns a new CompletionStage that, when this stage completes normally, //executes the given action using the supplied Executor. //@param action the action to perform before completing the returned CompletionStage //@param executor the executor to use for asynchronous execution //@return the new CompletionStage public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor); ... }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, //when this and the other given stage both complete normally, executes the given action. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@return the new CompletionStage public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action); //Returns a new CompletionStage that, //when this and the other given stage complete normally, //executes the given action using this stage's default asynchronous execution facility. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@return the new CompletionStage public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action); //Returns a new CompletionStage that, //when this and the other given stage complete normally, //executes the given action using the supplied executor. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@param executor the executor to use for asynchronous execution //@return the new CompletionStage public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor); ... }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, //when either this or the other given stage complete normally, executes the given action. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@return the new CompletionStage public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action); //Returns a new CompletionStage that, //when either this or the other given stage complete normally, //executes the given action using this stage's default asynchronous execution facility. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@return the new CompletionStage public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action); //Returns a new CompletionStage that, //when either this or the other given stage complete normally, //executes the given action using the supplied executor. //@param other the other CompletionStage //@param action the action to perform before completing the returned CompletionStage //@param executor the executor to use for asynchronous execution //@return the new CompletionStage public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor); ... }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, when this stage completes normally, //is executed with this stage as the argument to the supplied function. //@param fn the function returning a new CompletionStage //@param <U> the type of the returned CompletionStage's result //@return the CompletionStage public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); //Returns a new CompletionStage that, when this stage completes normally, //is executed using this stage's default asynchronous execution facility, //with this stage as the argument to the supplied function. //@param fn the function returning a new CompletionStage //@param <U> the type of the returned CompletionStage's result //@return the CompletionStage public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn); //Returns a new CompletionStage that, when this stage completes normally, //is executed using the supplied Executor, //with this stage's result as the argument to the supplied function. //@param fn the function returning a new CompletionStage //@param executor the executor to use for asynchronous execution //@param <U> the type of the returned CompletionStage's result //@return the CompletionStage public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor); ... } public class ThenComposeExample { //下面使用supplyAsync()方法构建了一个异步带返回值的任务,返回值为"Compose Message"; //接着使用thenCompose()方法组合另外一个任务,并把前面任务的返回值r作为参数传递给第二个任务 //在第二个任务中同样使用supplyAsync()方法构建了一个新的任务将参数r转为大写 //最后thenCompose()方法返回一个新的没有返回值的CompletionStage对象 public static void main(String[] args) { CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Compose Message"); CompletableFuture<String> cf = task1.thenCompose(r -> CompletableFuture.supplyAsync(() -> r.toUpperCase())); System.out.println(cf.get()); //或者采用Fluent风格来写 //CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Compose Message") // .thenCompose(r -> CompletableFuture.supplyAsync(() -> r.toUpperCase()) //); //System.out.println(cf.get()); } }
public interface CompletionStage<T> { ... //Returns a new CompletionStage with the same result or exception as this stage, //that executes the given action when this stage completes. //@param action the action to perform //@return the new CompletionStage public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); //Returns a new CompletionStage with the same result or exception as this stage, //that executes the given action using this stage's default asynchronous execution facility when this stage completes. //@param action the action to perform //@return the new CompletionStage public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); //Returns a new CompletionStage with the same result or exception as this stage, //that executes the given action using the supplied Executor when this stage completes. //@param action the action to perform //@param executor the executor to use for asynchronous execution //@return the new CompletionStage public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor); ... }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, //when this stage completes either normally or exceptionally, //is executed with this stage's result and exception as arguments to the supplied function. //@param fn the function to use to compute the value of the returned CompletionStage //@param <U> the function's return type //@return the new CompletionStage public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); //Returns a new CompletionStage that, //when this stage completes either normally or exceptionally, //is executed using this stage's default asynchronous execution facility, //with this stage's result and exception as arguments to the supplied function. //@param fn the function to use to compute the value of the returned CompletionStage //@param <U> the function's return type //@return the new CompletionStage public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); //Returns a new CompletionStage that, //when this stage completes either normally or exceptionally, //is executed using the supplied executor, //with this stage's result and exception as arguments to the supplied function. //@param fn the function to use to compute the value of the returned CompletionStage //@param executor the executor to use for asynchronous execution //@param <U> the function's return type //@return the new CompletionStage public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor); ... } public class HandleExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture cf = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Exception"); }).handleAsync((r, th) -> { return th != null ? "出现异常" : "正常执行"; }); System.out.println(cf.get()); } }
public interface CompletionStage<T> { ... //Returns a new CompletionStage that, //when this stage completes exceptionally, //is executed with this stage's exception as the argument to the supplied function. //Otherwise, if this stage completes normally, //then the returned stage also completes normally with the same value. //@param fn the function to use to compute the value of the returned CompletionStage if this CompletionStage completed exceptionally //@return the new CompletionStage public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn); ... } public class ExceptionallyExample { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture cf = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Exception"); }).exceptionally(e -> { log.error(e); return "ExceptionallyExample"; }); System.out.println(cf.get()); } }
public class CompletionStageExample { public static void main(String[] args) throws InterruptedException, ExecutionException { CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> "thenAccept message") .thenAcceptAsync((result) -> { System.out.println("第一个异步任务的返回值:" + result); }); cf.get(); } }
(2)CompletableFuture如何存储任务
一.CompletableFuture的成员变量
CompletableFuture的成员变量只有两个:result和stack。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { ... //表示CompletionStage任务的返回结果或者一个异常的封装对象AltResult volatile Object result;//Either the result or boxed AltResult //表示依赖操作栈的栈顶,链式调用中传递的任务都会被压入这个stack中 volatile Completion stack;//Top of Treiber stack of dependent actions ... }
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { ... abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { volatile Completion next;//Treiber stack link //Performs completion action if triggered, returning a dependent that may need propagation, if one exists. //@param mode SYNC, ASYNC, or NESTED abstract CompletableFuture<?> tryFire(int mode); //Returns true if possibly still triggerable. Used by cleanStack. abstract boolean isLive(); public final void run() { tryFire(ASYNC); } public final boolean exec() { tryFire(ASYNC); return true; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } } ... }
//Represents a supplier of results. //There is no requirement that a new or distinct result be returned each time the supplier is invoked. //This is a functional interface whose functional method is get(). //@param <T> the type of results supplied by this supplier @FunctionalInterface public interface Supplier<T> { //Gets a result. //@return a result T get(); } public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { volatile Object result; // Either the result or boxed AltResult volatile Completion stack; // Top of Treiber stack of dependent actions //Returns a new CompletableFuture that is asynchronously completed by a task //running in the ForkJoinPool#commonPool() with the value obtained by calling the given Supplier. //@param supplier a function returning the value to be used to complete the returned CompletableFuture //@param <U> the function's return type //@return the new CompletableFuture public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); //使用线程池来执行一个由AsyncSupply()方法构建的任务 e.execute(new AsyncSupply<U>(d, f)); //返回一个新的CompletableFuture对象 return d; } static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask { CompletableFuture<T> dep; Supplier<T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) { this.dep = dep; this.fn = fn; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { run(); return true; } public void run() { CompletableFuture<T> d; Supplier<T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { //首先使用f.get()来获得Supplier这个函数式接口中的执行结果 //然后通过执行CompletableFuture的completeValue()方法, //把执行结果设置到CompletableFuture的成员变量result中; d.completeValue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); } } //最后调用CompletableFuture的postComplete()方法执行Completion Stack中的所有回调任务 d.postComplete(); } } } //Completes with a non-exceptional result, unless already completed. final boolean completeValue(T t) { return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t); } //Pops and tries to trigger all reachable dependents. Call only when known to be done. final void postComplete() { //On each step, variable f holds current dependents to pop and run. //It is extended along only one path at a time, pushing others to avoid unbounded recursion. CompletableFuture<?> f = this; Completion h; while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) { CompletableFuture<?> d; Completion t; if (f.casStack(h, t = h.next)) { if (t != null) { if (f != this) { pushStack(h); continue; } h.next = null; // detach } f = (d = h.tryFire(NESTED)) == null ? this : d; } } } private static final sun.misc.Unsafe UNSAFE; private static final long RESULT; private static final long STACK; private static final long NEXT; static { try { final sun.misc.Unsafe u; UNSAFE = u = sun.misc.Unsafe.getUnsafe(); Class<?> k = CompletableFuture.class; RESULT = u.objectFieldOffset(k.getDeclaredField("result")); STACK = u.objectFieldOffset(k.getDeclaredField("stack")); NEXT = u.objectFieldOffset(Completion.class.getDeclaredField("next")); } catch (Exception x) { throw new Error(x); } } ... }