一、任务创建操作
1. runAsync() - 执行无返回值的异步任务
/** * 创建并执行无返回值的异步任务 * * @param runnable 要执行的任务逻辑(无返回值) * @return CompletableFuture<Void> 表示任务执行状态的Future对象 * * 特点: * - 任务在ForkJoinPool.commonPool()中执行 * - 完成后future结果为null * - 适合执行后台操作、日志记录等不需要返回值的场景 */ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("执行数据库清理任务..."); cleanDatabase(); // 执行耗时清理操作 System.out.println("数据库清理完成"); });
2. supplyAsync() - 执行有返回值的异步任务
/** * 创建并执行有返回值的异步任务 * * @param supplier 提供返回值的任务逻辑 * @return CompletableFuture<T> 包含任务结果的Future对象 * * 特点: * - 默认使用ForkJoinPool.commonPool() * - 可指定自定义线程池(第二个参数) * - 适合执行需要返回结果的计算或IO操作 */ CompletableFuture<String> dataFuture = CompletableFuture.supplyAsync(() -> { System.out.println("开始从远程API获取数据..."); String result = fetchDataFromAPI(); // 模拟网络请求 System.out.println("数据获取完成"); return result; // 返回获取的数据 }); // 使用自定义线程池 ExecutorService customPool = Executors.newFixedThreadPool(4); CompletableFuture<Integer> calculationFuture = CompletableFuture.supplyAsync(() -> { System.out.println("在自定义线程池中执行复杂计算..."); return heavyCalculation(); // 返回计算结果 }, customPool);
二、结果转换操作
1. thenApply() - 同步转换结果
/** * 同步转换前阶段的结果 * * @param function 转换函数(接受前一阶段结果,返回新结果) * @return CompletableFuture<U> 包含转换结果的新Future * * 特点: * - 在前一任务完成线程中同步执行 * - 会阻塞完成线程直到转换结束 * - 适合快速、非耗时的转换操作 */ CompletableFuture<String> upperCaseFuture = dataFuture.thenApply(rawData -> { System.out.println("同步转换原始数据..."); return rawData.toUpperCase(); // 立即执行转换 }); // 链式调用示例 CompletableFuture<Integer> lengthFuture = dataFuture .thenApply(String::trim) // 第一步:去除空格 .thenApply(String::length); // 第二步:计算长度
2. thenApplyAsync() - 异步转换结果
/** * 异步转换前阶段的结果 * * @param function 转换函数 * @param executor 可选,指定执行转换的线程池 * @return CompletableFuture<U> 包含转换结果的新Future * * 特点: * - 在独立线程中执行转换 * - 不会阻塞前一任务的完成线程 * - 适合耗时较长的转换操作 */ CompletableFuture<Report> reportFuture = dataFuture.thenApplyAsync(rawData -> { System.out.println("在异步线程中生成报告..."); return generateReport(rawData); // 耗时报告生成 }); // 使用自定义线程池 ExecutorService reportPool = Executors.newFixedThreadPool(2); CompletableFuture<AnalysisResult> analysisFuture = dataFuture.thenApplyAsync(rawData -> { System.out.println("在专用线程池中执行数据分析..."); return analyzeData(rawData); // 复杂数据分析 }, reportPool);
三、结果消费操作
1. thenAccept() - 消费结果(有输入)
/** * 消费前阶段的结果(无返回值) * * @param consumer 消费函数(接受前一阶段结果) * @return CompletableFuture<Void> * * 特点: * - 接收前一阶段结果作为输入 * - 不产生返回值 * - 适合日志记录、结果存储等操作 */ dataFuture.thenAccept(result -> { System.out.println("消费获取到的数据: " + result.substring(0, 10) + "..."); saveToDatabase(result); // 将结果保存到数据库 });
2. thenRun() - 执行操作(无输入)
/** * 在前阶段完成后执行操作(无输入参数) * * @param runnable 要执行的操作 * @return CompletableFuture<Void> * * 特点: * - 不接受前阶段结果 * - 无输入参数 * - 适合执行清理、状态更新等操作 */ dataFuture.thenRun(() -> { System.out.println("数据操作完成,释放资源..."); releaseResources(); // 释放使用的资源 });
四、任务组合操作
1. thenCompose() - 链式组合
/** * 链式组合两个异步任务(flatMap) * * @param function 接受前阶段结果,返回新CompletableFuture的函数 * @return CompletableFuture<U> 组合后的新Future * * 特点: * - 解决Future<Future<T>>嵌套问题 * - 形成异步任务流水线 * - 前阶段结果作为下一任务输入 */ CompletableFuture<Integer> userIdFuture = CompletableFuture.supplyAsync(() -> { System.out.println("获取用户ID..."); return getUserId(); // 返回用户ID }); // 使用用户ID获取详情 CompletableFuture<User> userDetailFuture = userIdFuture.thenCompose(userId -> { System.out.println("使用用户ID获取详情: " + userId); return getUserDetailAsync(userId); // 返回新的Future });
2. thenCombine() - 并行组合
/** * 并行组合两个独立任务 * * @param other 另一个CompletableFuture * @param bifunction 合并两个结果的函数 * @return CompletableFuture<V> 合并结果的新Future * * 特点: * - 等待两个任务都完成 * - 合并两个任务的结果 * - 适合聚合多个独立操作的结果 */ CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> { System.out.println("获取商品价格..."); return getProductPrice(); }); CompletableFuture<Double> rateFuture = CompletableFuture.supplyAsync(() -> { System.out.println("获取汇率..."); return getExchangeRate(); }); // 组合两个独立任务的结果 CompletableFuture<Double> localPriceFuture = priceFuture.thenCombine(rateFuture, (price, rate) -> { System.out.println("计算本地价格: " + price + " * " + rate); return price * rate; // 合并计算结果 });
3. acceptEither() - 消费最先完成的结果
/** * 消费最先完成的任务结果 * * @param other 另一个CompletableFuture * @param consumer 消费函数 * @return CompletableFuture<Void> * * 特点: * - 任一任务完成即触发消费 * - 未完成任务继续执行但不处理结果 * - 适合快速响应场景 */ CompletableFuture<String> primaryService = queryService("主服务"); CompletableFuture<String> backupService = queryService("备用服务"); // 使用最先返回的结果 primaryService.acceptEither(backupService, result -> { System.out.println("使用服务响应: " + result); displayToUser(result); // 向用户显示结果 });
五、多任务协调
1. allOf() - 等待所有任务完成
/** * 等待所有任务完成 * * @param futures 多个CompletableFuture * @return CompletableFuture<Void> 所有任务完成时结束 * * 特点: * - 所有任务完成时返回 * - 需要手动获取各任务结果 * - 适合聚合多个异步操作结果 */ CompletableFuture<String> task1 = supplyAsync(() -> "结果1"); CompletableFuture<String> task2 = supplyAsync(() -> "结果2"); CompletableFuture<String> task3 = supplyAsync(() -> "结果3"); CompletableFuture<Void> allFutures = CompletableFuture.allOf(task1, task2, task3); // 所有任务完成后处理 allFutures.thenRun(() -> { // 使用join()获取结果(不会抛受检异常) String result1 = task1.join(); String result2 = task2.join(); String result3 = task3.join(); System.out.println("所有任务完成,聚合结果: " + result1 + ", " + result2 + ", " + result3); });
2. anyOf() - 等待任一任务完成
/** * 任一任务完成即返回 * * @param futures 多个CompletableFuture * @return CompletableFuture<Object> 包含首个完成的结果 * * 特点: * - 返回Object类型结果(需类型转换) * - 未完成任务继续执行 * - 适合竞态条件场景 */ CompletableFuture<String> cacheQuery = queryCache(); CompletableFuture<String> dbQuery = queryDatabase(); CompletableFuture<Object> firstResult = CompletableFuture.anyOf(cacheQuery, dbQuery); firstResult.thenAccept(result -> { // 需要显式类型转换 String data = (String) result; System.out.println("最先返回的结果: " + data); });
六、异常处理
1. exceptionally() - 异常恢复
/** * 捕获异常并返回替代值 * * @param function 异常处理函数 * @return CompletableFuture<T> 包含正常结果或替代值的新Future * * 特点: * - 仅在前阶段异常时触发 * - 可以恢复为正常结果 * - 相当于catch块 */ CompletableFuture<Integer> riskyFuture = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) { throw new RuntimeException("随机错误"); } return 42; }); CompletableFuture<Integer> safeFuture = riskyFuture.exceptionally(ex -> { System.err.println("捕获异常: " + ex.getMessage()); return 0; // 提供默认值 });
2. handle() - 双结果处理
/** * 处理正常结果和异常 * * @param bifunction 接受结果和异常的处理函数 * @return CompletableFuture<U> 处理后的新Future * * 特点: * - 无论成功失败都会执行 * - 可同时访问结果和异常 * - 必须返回新结果 */ CompletableFuture<String> apiCall = fetchFromAPI(); CompletableFuture<String> processed = apiCall.handle((result, ex) -> { if (ex != null) { System.err.println("API调用失败: " + ex.getMessage()); return "默认数据"; // 异常时返回默认值 } return "处理后的: " + result.toUpperCase(); // 正常时转换结果 });
3. whenComplete() - 完成回调
/** * 完成时回调(不改变结果) * * @param biconsumer 接受结果和异常的回调函数 * @return CompletableFuture<T> 保留原始结果的新Future * * 特点: * - 类似finally块 * - 可访问结果或异常 * - 不影响原始结果 */ dataFuture.whenComplete((result, ex) -> { if (ex != null) { System.err.println("数据处理失败: " + ex.getMessage()); metrics.recordFailure(); } else { System.out.println("数据处理成功,长度: " + result.length()); metrics.recordSuccess(); } });
七、完成控制(Java 9+)
1. completeOnTimeout() - 超时默认值
/** * 超时提供默认值 * * @param value 默认值 * @param timeout 超时时间 * @param unit 时间单位 * @return CompletableFuture<T> * * 特点: * - 超时后自动完成并返回默认值 * - 原始任务继续执行但结果被忽略 * - 避免无限期等待 */ CompletableFuture<String> slowService = callSlowService(); CompletableFuture<String> withTimeout = slowService .completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);
2. orTimeout() - 超时异常
/** * 超时抛出异常 * * @param timeout 超时时间 * @param unit 时间单位 * @return CompletableFuture<T> * * 特点: * - 超时后抛出TimeoutException * - 可配合exceptionally处理 * - 强制设置最大等待时间 */ CompletableFuture<String> serviceCall = externalService() .orTimeout(500, TimeUnit.MILLISECONDS) .exceptionally(ex -> { if (ex.getCause() instanceof TimeoutException) { return "超时回退值"; } return "其他错误回退值"; });
八、完成状态操作
1. complete() - 手动设置结果
/** * 手动设置结果值 * * @param value 要设置的结果 * @return boolean 是否设置成功(如果已完成返回false) * * 特点: * - 强制完成future * - 唤醒所有等待线程 * - 如果已完成则无效 */ CompletableFuture<String> manualFuture = new CompletableFuture<>(); // 外部线程完成 new Thread(() -> { try { String result = computeResult(); boolean set = manualFuture.complete(result); System.out.println("手动设置结果: " + set); } catch (Exception e) { manualFuture.completeExceptionally(e); } }).start();
2. completeExceptionally() - 手动设置异常
/** * 手动设置异常 * * @param ex 要设置的异常 * @return boolean 是否设置成功 * * 特点: * - 以异常状态完成future * - 触发异常处理链 * - 模拟服务失败场景 */ CompletableFuture<String> simulatedFailure = new CompletableFuture<>(); // 模拟失败 simulatedFailure.completeExceptionally(new RuntimeException("模拟异常")); simulatedFuture.exceptionally(ex -> { System.out.println("捕获模拟异常: " + ex.getMessage()); return "回退值"; });
操作对比总结表
| 操作类型 | 方法名 | 特点 | 适用场景 | 线程行为 |
|---|---|---|---|---|
| 任务创建 | runAsync | 无返回值 | 后台任务、清理操作 | 异步执行 |
| supplyAsync | 有返回值 | 数据获取、计算任务 | 异步执行 | |
| 结果转换 | thenApply | 同步转换 | 快速转换、数据预处理 | 使用前一任务线程 |
| thenApplyAsync | 异步转换 | 耗时操作、IO处理 | 使用新线程 | |
| 结果消费 | thenAccept | 消费结果 | 结果存储、日志记录 | 使用前一任务线程 |
| thenRun | 无参操作 | 资源清理、状态更新 | 使用前一任务线程 | |
| 任务组合 | thenCompose | 链式组合 | 任务流水线、依赖操作 | 使用前一任务线程 |
| thenCombine | 并行组合 | 聚合独立任务结果 | 使用前一任务线程 | |
| acceptEither | 消费首个结果 | 竞速服务、降级策略 | 使用完成任务的线程 | |
| 多任务 | allOf | 等待所有 | 批量处理、聚合数据 | 任意完成线程 |
| anyOf | 等待任一 | 快速响应、超时处理 | 首个完成线程 | |
| 异常处理 | exceptionally | 异常恢复 | 提供默认值 | 异常发生线程 |
| handle | 双结果处理 | 统一处理成功/失败 | 使用前一任务线程 | |
| whenComplete | 完成回调 | 资源清理、指标记录 | 使用前一任务线程 | |
| 完成控制 | complete | 手动设值 | 外部控制、测试模拟 | 调用者线程 |
| completeExceptionally | 手动设异常 | 模拟失败、错误注入 | 调用者线程 |
关键执行机制详解
1. 线程传递规则
- 非Async方法:在前一阶段的任务线程中同步执行
- Async方法:默认在ForkJoinPool.commonPool()执行
- 带Executor参数的Async方法:在指定线程池执行
2. 结果传递流程
graph TD A[任务完成] --> B{是否有依赖操作} B -->|有| C[触发依赖动作] C --> D[创建新阶段] D --> E[存储结果] E --> F[触发下一依赖] B -->|无| G[保存结果待后续访问]
3. 异常传播机制
- 异常会沿调用链向后传播
- 直到遇到exceptionally()或handle()处理
- 未处理的异常会导致整个链失败
- whenComplete()可访问异常但不处理
4. 依赖管理策略
- 使用栈结构存储依赖关系
- 完成后按后进先出(LIFO)顺序触发
- 保证依赖链的有序执行
- 避免嵌套过深导致的栈溢出问题
掌握这些核心操作的详细用法和执行特性,能够帮助开发者构建高效、健壮的异步处理系统,有效管理复杂的异步任务编排。