02.02.03 CompletableFuture 实战案例:电商订单与 API 聚合
原则说明独立异常处理每个任务独立处理异常,避免连锁失败设置超时所有远程调用必须设置超时提供降级非核心功能失败时返回默认值日志追踪记录关键节点,便于排查问题资源隔离不同类型任务使用不同线程池订单处理:并行校验 → 支付 → 并行后续处理API 聚合:并行调用 + 独立超时 + 独立降级批量处理:进度跟踪 + 错误隔离 + 结果汇总重试机制:指数退避 + 熔断器核心思想:将复杂流程拆解为独立任务,并行
·
02.02.03 CompletableFuture 实战案例:电商订单与 API 聚合
导读
理论知识需要通过实战来巩固。本文将通过真实的业务场景——电商订单处理和 API 聚合网关,展示 CompletableFuture 在生产环境中的最佳实践。
每个案例都包含完整的代码实现、性能对比和注意事项,帮助你在实际项目中灵活运用。
适用人群:已掌握 CompletableFuture API 的开发者,想要学习实战应用
学习目标:
- 掌握电商订单异步处理流程设计
- 学会 API 聚合网关的实现技巧
- 理解批量任务处理与进度跟踪
- 掌握超时控制与重试机制
一、电商订单处理流水线
1.1 业务场景
电商下单流程涉及多个服务调用:
- 校验用户信息
- 检查库存
- 调用支付服务
- 发送通知
- 记录订单
串行 vs 并行性能对比:
串行处理(传统方式):
用户校验(100ms) → 库存检查(150ms) → 支付(500ms) → 通知(200ms) → 记录(100ms)
总耗时: 1050ms
并行处理(CompletableFuture):
┌─ 用户校验(100ms) ─┐
│ ├── 支付(500ms) → 通知(200ms)
└─ 库存检查(150ms) ─┘ ↓
记录(100ms)
总耗时: 150ms + 500ms + 200ms = 850ms(节省 19%)
1.2 完整实现
@Service
public class OrderService {
private final UserService userService;
private final InventoryService inventoryService;
private final PaymentService paymentService;
private final NotificationService notificationService;
private final OrderRepository orderRepository;
private final ExecutorService ioExecutor;
public OrderService() {
this.ioExecutor = new ThreadPoolExecutor(
16, 64, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadFactoryBuilder().setNameFormat("order-io-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
/**
* 异步处理订单
*/
public CompletableFuture<OrderResult> processOrderAsync(OrderRequest request) {
String orderId = UUID.randomUUID().toString();
// 第一阶段:并行校验(用户 + 库存)
CompletableFuture<UserInfo> userFuture = CompletableFuture
.supplyAsync(() -> userService.validateUser(request.getUserId()), ioExecutor)
.orTimeout(1, TimeUnit.SECONDS)
.exceptionally(ex -> {
log.warn("用户校验失败,使用默认信息", ex);
return UserInfo.guest();
});
CompletableFuture<StockInfo> stockFuture = CompletableFuture
.supplyAsync(() -> inventoryService.checkStock(request.getItems()), ioExecutor)
.orTimeout(1, TimeUnit.SECONDS);
// 第二阶段:合并校验结果,创建订单上下文
CompletableFuture<OrderContext> contextFuture = userFuture
.thenCombine(stockFuture, (user, stock) -> {
if (!stock.isAvailable()) {
throw new InsufficientStockException("库存不足");
}
return new OrderContext(orderId, user, stock, request);
});
// 第三阶段:支付
CompletableFuture<PaymentResult> paymentFuture = contextFuture
.thenCompose(ctx -> CompletableFuture
.supplyAsync(() -> paymentService.pay(ctx), ioExecutor)
.orTimeout(5, TimeUnit.SECONDS)
);
// 第四阶段:支付成功后,并行执行通知和记录
return paymentFuture
.thenCompose(payment -> {
// 异步发送通知(不影响主流程)
CompletableFuture.runAsync(() ->
notificationService.sendOrderSuccess(orderId, request.getUserId()),
ioExecutor
).exceptionally(ex -> {
log.error("发送通知失败", ex);
return null;
});
// 记录订单
return CompletableFuture.supplyAsync(() ->
orderRepository.save(orderId, request, payment),
ioExecutor
);
})
// 统一结果封装
.handle((saved, ex) -> {
if (ex != null) {
Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
log.error("订单处理失败: orderId={}", orderId, cause);
// 补偿:回滚库存
inventoryService.revertStockAsync(request.getItems());
if (cause instanceof InsufficientStockException) {
return OrderResult.outOfStock(orderId);
}
if (cause instanceof PaymentException) {
return OrderResult.paymentFailed(orderId, cause.getMessage());
}
if (cause instanceof TimeoutException) {
return OrderResult.timeout(orderId);
}
return OrderResult.systemError(orderId, cause.getMessage());
}
log.info("订单处理成功: orderId={}", orderId);
return OrderResult.success(orderId, saved.getTransactionId());
});
}
}
1.3 流程图
┌─────────────────────────────────────────────────────────────┐
│ 订单处理流程 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────┴─────────────────────┐
│ 第一阶段:并行校验 │
├─────────────────────┬─────────────────────┤
▼ ▼
┌───────────────┐ ┌───────────────┐
│ 用户校验 │ │ 库存检查 │
│ (100ms) │ │ (150ms) │
└───────┬───────┘ └───────┬───────┘
│ │
└──────────┬──────────┘
▼
┌────────────────────┐
│ 第二阶段:合并结果 │
│ 创建订单上下文 │
└─────────┬──────────┘
▼
┌────────────────────┐
│ 第三阶段:支付 │
│ (500ms) │
└─────────┬──────────┘
▼
┌──────────────┴──────────────┐
│ 第四阶段:并行后续处理 │
├──────────────┬──────────────┤
▼ ▼
┌──────────┐ ┌──────────┐
│ 发送通知 │ │ 记录订单 │
│ (异步) │ │ │
└──────────┘ └────┬─────┘
▼
┌───────────────┐
│ 返回结果 │
└───────────────┘
二、API 聚合网关
2.1 业务场景
移动端首页需要聚合多个微服务的数据:
- 用户信息服务
- 订单服务
- 优惠券服务
- 推荐服务
要求:并行调用,设置超时,部分失败不影响整体。
2.2 完整实现
@RestController
@RequestMapping("/api/v1/dashboard")
public class DashboardController {
private final UserClient userClient;
private final OrderClient orderClient;
private final CouponClient couponClient;
private final RecommendClient recommendClient;
private final ExecutorService ioExecutor;
@GetMapping("/{userId}")
public CompletableFuture<Dashboard> getDashboard(@PathVariable String userId) {
long startTime = System.currentTimeMillis();
// 并行调用四个服务,每个都有独立的超时和降级
CompletableFuture<UserProfile> userFuture = fetchWithFallback(
() -> userClient.getProfile(userId),
UserProfile.anonymous(),
"用户服务",
1000
);
CompletableFuture<List<OrderSummary>> ordersFuture = fetchWithFallback(
() -> orderClient.getRecentOrders(userId, 5),
Collections.emptyList(),
"订单服务",
1500
);
CompletableFuture<List<Coupon>> couponsFuture = fetchWithFallback(
() -> couponClient.getAvailableCoupons(userId),
Collections.emptyList(),
"优惠券服务",
1000
);
CompletableFuture<List<Product>> recommendsFuture = fetchWithFallback(
() -> recommendClient.getRecommendations(userId, 10),
Collections.emptyList(),
"推荐服务",
2000
);
// 聚合所有结果
return CompletableFuture.allOf(
userFuture, ordersFuture, couponsFuture, recommendsFuture
).thenApply(v -> {
long elapsed = System.currentTimeMillis() - startTime;
log.info("Dashboard 聚合完成,耗时 {}ms", elapsed);
return Dashboard.builder()
.user(userFuture.join())
.recentOrders(ordersFuture.join())
.coupons(couponsFuture.join())
.recommendations(recommendsFuture.join())
.build();
});
}
/**
* 通用的带降级的异步调用
*/
private <T> CompletableFuture<T> fetchWithFallback(
Supplier<T> supplier,
T fallback,
String serviceName,
long timeoutMs) {
return CompletableFuture
.supplyAsync(supplier, ioExecutor)
.orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
.exceptionally(ex -> {
Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
if (cause instanceof TimeoutException) {
log.warn("{}调用超时 (>{}ms),使用降级值", serviceName, timeoutMs);
} else {
log.error("{}调用失败,使用降级值", serviceName, cause);
}
metrics.increment("api.fallback",
"service", serviceName,
"reason", cause.getClass().getSimpleName()
);
return fallback;
});
}
}
2.3 性能监控增强
@Component
public class AsyncApiMonitor {
private final MeterRegistry meterRegistry;
/**
* 带监控的异步调用
*/
public <T> CompletableFuture<T> monitoredAsync(
String operationName,
Supplier<T> supplier,
ExecutorService executor) {
Timer.Sample sample = Timer.start(meterRegistry);
return CompletableFuture
.supplyAsync(supplier, executor)
.whenComplete((result, ex) -> {
String status = (ex == null) ? "success" : "failure";
sample.stop(Timer.builder("async.operation")
.tag("operation", operationName)
.tag("status", status)
.register(meterRegistry));
if (ex != null) {
meterRegistry.counter("async.error",
"operation", operationName,
"exception", ex.getClass().getSimpleName()
).increment();
}
});
}
}
三、批量任务处理
3.1 业务场景
需要批量处理用户数据,要求:
- 并行处理提高效率
- 进度跟踪
- 错误不影响其他任务
- 最终汇总结果
3.2 完整实现
@Service
public class BatchProcessor {
private final ExecutorService executor;
public BatchProcessor() {
this.executor = new ThreadPoolExecutor(
8, 32, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("batch-%d").build()
);
}
/**
* 批量处理用户数据
*/
public CompletableFuture<BatchResult> processBatch(
List<String> userIds,
Consumer<Progress> progressCallback) {
AtomicInteger completed = new AtomicInteger(0);
AtomicInteger failed = new AtomicInteger(0);
List<CompletableFuture<UserResult>> futures = userIds.stream()
.map(userId -> processUserAsync(userId)
.whenComplete((result, ex) -> {
int current = completed.incrementAndGet();
if (ex != null || !result.isSuccess()) {
failed.incrementAndGet();
}
// 回调进度
progressCallback.accept(new Progress(
current,
userIds.size(),
failed.get()
));
})
)
.collect(Collectors.toList());
// 等待所有任务完成
return CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<UserResult> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
return new BatchResult(
userIds.size(),
completed.get() - failed.get(),
failed.get(),
results
);
});
}
/**
* 处理单个用户(带重试)
*/
private CompletableFuture<UserResult> processUserAsync(String userId) {
return CompletableFuture
.supplyAsync(() -> processUser(userId), executor)
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("处理用户失败: {}", userId, ex);
return UserResult.failure(userId, ex.getMessage());
});
}
private UserResult processUser(String userId) {
// 模拟处理逻辑
try {
Thread.sleep(100 + new Random().nextInt(200));
if (Math.random() < 0.05) {
throw new RuntimeException("随机失败");
}
return UserResult.success(userId, "处理完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
// 使用
batchProcessor.processBatch(userIds, progress -> {
log.info("进度: {}/{}, 失败: {}",
progress.getCurrent(),
progress.getTotal(),
progress.getFailedCount()
);
}).thenAccept(result -> {
log.info("处理完成: 成功={}, 失败={}",
result.getSuccessCount(),
result.getFailureCount()
);
});
3.3 分批处理(控制并发)
/**
* 分批处理,控制并发数
*/
public CompletableFuture<List<UserResult>> processBatchWithConcurrencyLimit(
List<String> userIds, int batchSize) {
List<List<String>> batches = Lists.partition(userIds, batchSize);
CompletableFuture<List<UserResult>> resultFuture =
CompletableFuture.completedFuture(new ArrayList<>());
for (List<String> batch : batches) {
resultFuture = resultFuture.thenCompose(prevResults -> {
// 并行处理当前批次
List<CompletableFuture<UserResult>> batchFutures = batch.stream()
.map(this::processUserAsync)
.collect(Collectors.toList());
return CompletableFuture
.allOf(batchFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<UserResult> batchResults = batchFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
List<UserResult> combined = new ArrayList<>(prevResults);
combined.addAll(batchResults);
return combined;
});
});
}
return resultFuture;
}
四、超时与重试机制
4.1 指数退避重试
public class RetryableService {
private final ExecutorService executor;
private final ScheduledExecutorService scheduler;
/**
* 带指数退避的重试
*/
public <T> CompletableFuture<T> executeWithRetry(
Supplier<T> supplier,
int maxRetries,
long initialDelayMs) {
return executeWithRetryInternal(supplier, maxRetries, initialDelayMs, 0);
}
private <T> CompletableFuture<T> executeWithRetryInternal(
Supplier<T> supplier,
int maxRetries,
long delayMs,
int attempt) {
return CompletableFuture
.supplyAsync(supplier, executor)
.exceptionally(ex -> {
if (attempt >= maxRetries) {
log.error("重试 {} 次后仍然失败", maxRetries, ex);
throw new RuntimeException("超过最大重试次数", ex);
}
log.warn("第 {} 次尝试失败,{}ms 后重试",
attempt + 1, delayMs, ex);
// 递归重试(注意:这是同步递归,需要改进)
throw new RuntimeException("需要重试", ex);
})
.thenApply(result -> result)
.exceptionally(ex -> {
// 延迟后重试
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return executeWithRetryInternal(
supplier,
maxRetries,
delayMs * 2, // 指数退避
attempt + 1
).join();
});
}
/**
* 改进版:完全异步的重试
*/
public <T> CompletableFuture<T> executeWithAsyncRetry(
Supplier<CompletableFuture<T>> futureSupplier,
int maxRetries,
long initialDelayMs) {
CompletableFuture<T> result = new CompletableFuture<>();
executeWithAsyncRetryInternal(
futureSupplier, result, maxRetries, initialDelayMs, 0
);
return result;
}
private <T> void executeWithAsyncRetryInternal(
Supplier<CompletableFuture<T>> futureSupplier,
CompletableFuture<T> result,
int maxRetries,
long delayMs,
int attempt) {
futureSupplier.get().whenComplete((value, ex) -> {
if (ex == null) {
result.complete(value);
return;
}
if (attempt >= maxRetries) {
result.completeExceptionally(
new RuntimeException("超过最大重试次数: " + maxRetries, ex)
);
return;
}
log.warn("第 {} 次尝试失败,{}ms 后重试", attempt + 1, delayMs);
// 延迟后异步重试
scheduler.schedule(() -> {
executeWithAsyncRetryInternal(
futureSupplier, result, maxRetries, delayMs * 2, attempt + 1
);
}, delayMs, TimeUnit.MILLISECONDS);
});
}
}
4.2 熔断器模式
public class CircuitBreaker {
private final AtomicInteger failureCount = new AtomicInteger(0);
private final AtomicLong lastFailureTime = new AtomicLong(0);
private final int threshold;
private final long resetTimeMs;
public CircuitBreaker(int threshold, long resetTimeMs) {
this.threshold = threshold;
this.resetTimeMs = resetTimeMs;
}
public <T> CompletableFuture<T> execute(
Supplier<CompletableFuture<T>> supplier,
T fallback) {
// 检查熔断状态
if (isOpen()) {
log.warn("熔断器开启,返回降级值");
return CompletableFuture.completedFuture(fallback);
}
return supplier.get()
.whenComplete((result, ex) -> {
if (ex != null) {
recordFailure();
} else {
reset();
}
})
.exceptionally(ex -> {
log.warn("调用失败,返回降级值", ex);
return fallback;
});
}
private boolean isOpen() {
if (failureCount.get() >= threshold) {
long elapsed = System.currentTimeMillis() - lastFailureTime.get();
if (elapsed < resetTimeMs) {
return true;
}
// 半开状态,尝试恢复
failureCount.set(threshold / 2);
}
return false;
}
private void recordFailure() {
failureCount.incrementAndGet();
lastFailureTime.set(System.currentTimeMillis());
}
private void reset() {
failureCount.set(0);
}
}
// 使用
CircuitBreaker breaker = new CircuitBreaker(5, 30000);
CompletableFuture<String> result = breaker.execute(
() -> httpClient.getAsync(url),
"降级值"
);
五、最佳实践总结
5.1 设计原则
| 原则 | 说明 |
|---|---|
| 独立异常处理 | 每个任务独立处理异常,避免连锁失败 |
| 设置超时 | 所有远程调用必须设置超时 |
| 提供降级 | 非核心功能失败时返回默认值 |
| 日志追踪 | 记录关键节点,便于排查问题 |
| 资源隔离 | 不同类型任务使用不同线程池 |
5.2 性能优化建议
- 并行化:识别独立任务,使用
thenCombine或allOf并行执行 - 超时控制:合理设置超时,避免长时间阻塞
- 批量处理:控制并发数,避免压垮下游服务
- 连接池:HTTP/数据库连接池预热,减少冷启动延迟
5.3 常见问题排查
// 问题1:任务没有执行
// 原因:可能使用了默认 ForkJoinPool,线程被阻塞
// 解决:使用自定义线程池
// 问题2:异常被吞掉
// 原因:没有添加异常处理
// 解决:使用 exceptionally 或 handle
// 问题3:内存溢出
// 原因:创建太多 CompletableFuture
// 解决:分批处理,限制并发数
// 问题4:响应慢
// 原因:串行调用,没有并行化
// 解决:使用 thenCombine 或 allOf 并行
六、总结
通过本文的实战案例,你应该掌握了:
- 订单处理:并行校验 → 支付 → 并行后续处理
- API 聚合:并行调用 + 独立超时 + 独立降级
- 批量处理:进度跟踪 + 错误隔离 + 结果汇总
- 重试机制:指数退避 + 熔断器
核心思想:将复杂流程拆解为独立任务,并行执行提升效率,独立处理异常保证稳定性。
上一篇回顾: 《CompletableFuture 组合与异常处理:构建复杂异步流》
我们深入探讨如何组合多个 CompletableFuture,以及优雅处理异步异常的最佳实践。
下一篇预告:
《CompletableFuture 线程池与最佳实践:生产环境调优指南》
我们将深入探讨线程池配置、性能调优和生产环境最佳实践。
更多推荐

所有评论(0)