02.02.03 CompletableFuture 实战案例:电商订单与 API 聚合

导读

理论知识需要通过实战来巩固。本文将通过真实的业务场景——电商订单处理和 API 聚合网关,展示 CompletableFuture 在生产环境中的最佳实践。

每个案例都包含完整的代码实现、性能对比和注意事项,帮助你在实际项目中灵活运用。

适用人群:已掌握 CompletableFuture API 的开发者,想要学习实战应用

学习目标

  • 掌握电商订单异步处理流程设计
  • 学会 API 聚合网关的实现技巧
  • 理解批量任务处理与进度跟踪
  • 掌握超时控制与重试机制

一、电商订单处理流水线

1.1 业务场景

电商下单流程涉及多个服务调用:

  1. 校验用户信息
  2. 检查库存
  3. 调用支付服务
  4. 发送通知
  5. 记录订单

串行 vs 并行性能对比

串行处理(传统方式):
用户校验(100ms) → 库存检查(150ms) → 支付(500ms) → 通知(200ms) → 记录(100ms)
总耗时: 1050ms

并行处理(CompletableFuture):
┌─ 用户校验(100ms) ─┐
│                   ├── 支付(500ms) → 通知(200ms)
└─ 库存检查(150ms) ─┘                     ↓
                                       记录(100ms)
总耗时: 150ms + 500ms + 200ms = 850ms(节省 19%)

1.2 完整实现

@Service
public class OrderService {
    
    private final UserService userService;
    private final InventoryService inventoryService;
    private final PaymentService paymentService;
    private final NotificationService notificationService;
    private final OrderRepository orderRepository;
    
    private final ExecutorService ioExecutor;
    
    public OrderService() {
        this.ioExecutor = new ThreadPoolExecutor(
            16, 64, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10000),
            new ThreadFactoryBuilder().setNameFormat("order-io-%d").build(),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
    
    /**
     * 异步处理订单
     */
    public CompletableFuture<OrderResult> processOrderAsync(OrderRequest request) {
        String orderId = UUID.randomUUID().toString();
        
        // 第一阶段:并行校验(用户 + 库存)
        CompletableFuture<UserInfo> userFuture = CompletableFuture
            .supplyAsync(() -> userService.validateUser(request.getUserId()), ioExecutor)
            .orTimeout(1, TimeUnit.SECONDS)
            .exceptionally(ex -> {
                log.warn("用户校验失败,使用默认信息", ex);
                return UserInfo.guest();
            });
        
        CompletableFuture<StockInfo> stockFuture = CompletableFuture
            .supplyAsync(() -> inventoryService.checkStock(request.getItems()), ioExecutor)
            .orTimeout(1, TimeUnit.SECONDS);
        
        // 第二阶段:合并校验结果,创建订单上下文
        CompletableFuture<OrderContext> contextFuture = userFuture
            .thenCombine(stockFuture, (user, stock) -> {
                if (!stock.isAvailable()) {
                    throw new InsufficientStockException("库存不足");
                }
                return new OrderContext(orderId, user, stock, request);
            });
        
        // 第三阶段:支付
        CompletableFuture<PaymentResult> paymentFuture = contextFuture
            .thenCompose(ctx -> CompletableFuture
                .supplyAsync(() -> paymentService.pay(ctx), ioExecutor)
                .orTimeout(5, TimeUnit.SECONDS)
            );
        
        // 第四阶段:支付成功后,并行执行通知和记录
        return paymentFuture
            .thenCompose(payment -> {
                // 异步发送通知(不影响主流程)
                CompletableFuture.runAsync(() -> 
                    notificationService.sendOrderSuccess(orderId, request.getUserId()),
                    ioExecutor
                ).exceptionally(ex -> {
                    log.error("发送通知失败", ex);
                    return null;
                });
                
                // 记录订单
                return CompletableFuture.supplyAsync(() -> 
                    orderRepository.save(orderId, request, payment),
                    ioExecutor
                );
            })
            // 统一结果封装
            .handle((saved, ex) -> {
                if (ex != null) {
                    Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
                    log.error("订单处理失败: orderId={}", orderId, cause);
                    
                    // 补偿:回滚库存
                    inventoryService.revertStockAsync(request.getItems());
                    
                    if (cause instanceof InsufficientStockException) {
                        return OrderResult.outOfStock(orderId);
                    }
                    if (cause instanceof PaymentException) {
                        return OrderResult.paymentFailed(orderId, cause.getMessage());
                    }
                    if (cause instanceof TimeoutException) {
                        return OrderResult.timeout(orderId);
                    }
                    return OrderResult.systemError(orderId, cause.getMessage());
                }
                
                log.info("订单处理成功: orderId={}", orderId);
                return OrderResult.success(orderId, saved.getTransactionId());
            });
    }
}

1.3 流程图

         ┌─────────────────────────────────────────────────────────────┐
         │                    订单处理流程                              │
         └─────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
              ┌─────────────────────┴─────────────────────┐
              │              第一阶段:并行校验             │
              ├─────────────────────┬─────────────────────┤
              ▼                     ▼                     
      ┌───────────────┐     ┌───────────────┐           
      │  用户校验      │     │  库存检查      │           
      │  (100ms)      │     │  (150ms)      │           
      └───────┬───────┘     └───────┬───────┘           
              │                     │                   
              └──────────┬──────────┘                   
                         ▼                               
              ┌────────────────────┐                    
              │  第二阶段:合并结果  │                    
              │  创建订单上下文     │                    
              └─────────┬──────────┘                    
                        ▼                               
              ┌────────────────────┐                    
              │  第三阶段:支付     │                    
              │  (500ms)           │                    
              └─────────┬──────────┘                    
                        ▼                               
         ┌──────────────┴──────────────┐               
         │    第四阶段:并行后续处理    │               
         ├──────────────┬──────────────┤               
         ▼              ▼                               
   ┌──────────┐  ┌──────────┐                          
   │ 发送通知  │  │ 记录订单  │                          
   │ (异步)   │  │          │                          
   └──────────┘  └────┬─────┘                          
                      ▼                                
              ┌───────────────┐                        
              │  返回结果      │                        
              └───────────────┘                        

二、API 聚合网关

2.1 业务场景

移动端首页需要聚合多个微服务的数据:

  • 用户信息服务
  • 订单服务
  • 优惠券服务
  • 推荐服务

要求:并行调用,设置超时,部分失败不影响整体。

2.2 完整实现

@RestController
@RequestMapping("/api/v1/dashboard")
public class DashboardController {
    
    private final UserClient userClient;
    private final OrderClient orderClient;
    private final CouponClient couponClient;
    private final RecommendClient recommendClient;
    
    private final ExecutorService ioExecutor;
    
    @GetMapping("/{userId}")
    public CompletableFuture<Dashboard> getDashboard(@PathVariable String userId) {
        long startTime = System.currentTimeMillis();
        
        // 并行调用四个服务,每个都有独立的超时和降级
        CompletableFuture<UserProfile> userFuture = fetchWithFallback(
            () -> userClient.getProfile(userId),
            UserProfile.anonymous(),
            "用户服务",
            1000
        );
        
        CompletableFuture<List<OrderSummary>> ordersFuture = fetchWithFallback(
            () -> orderClient.getRecentOrders(userId, 5),
            Collections.emptyList(),
            "订单服务",
            1500
        );
        
        CompletableFuture<List<Coupon>> couponsFuture = fetchWithFallback(
            () -> couponClient.getAvailableCoupons(userId),
            Collections.emptyList(),
            "优惠券服务",
            1000
        );
        
        CompletableFuture<List<Product>> recommendsFuture = fetchWithFallback(
            () -> recommendClient.getRecommendations(userId, 10),
            Collections.emptyList(),
            "推荐服务",
            2000
        );
        
        // 聚合所有结果
        return CompletableFuture.allOf(
            userFuture, ordersFuture, couponsFuture, recommendsFuture
        ).thenApply(v -> {
            long elapsed = System.currentTimeMillis() - startTime;
            log.info("Dashboard 聚合完成,耗时 {}ms", elapsed);
            
            return Dashboard.builder()
                .user(userFuture.join())
                .recentOrders(ordersFuture.join())
                .coupons(couponsFuture.join())
                .recommendations(recommendsFuture.join())
                .build();
        });
    }
    
    /**
     * 通用的带降级的异步调用
     */
    private <T> CompletableFuture<T> fetchWithFallback(
            Supplier<T> supplier,
            T fallback,
            String serviceName,
            long timeoutMs) {
        
        return CompletableFuture
            .supplyAsync(supplier, ioExecutor)
            .orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
            .exceptionally(ex -> {
                Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
                
                if (cause instanceof TimeoutException) {
                    log.warn("{}调用超时 (>{}ms),使用降级值", serviceName, timeoutMs);
                } else {
                    log.error("{}调用失败,使用降级值", serviceName, cause);
                }
                
                metrics.increment("api.fallback", 
                    "service", serviceName,
                    "reason", cause.getClass().getSimpleName()
                );
                
                return fallback;
            });
    }
}

2.3 性能监控增强

@Component
public class AsyncApiMonitor {
    
    private final MeterRegistry meterRegistry;
    
    /**
     * 带监控的异步调用
     */
    public <T> CompletableFuture<T> monitoredAsync(
            String operationName,
            Supplier<T> supplier,
            ExecutorService executor) {
        
        Timer.Sample sample = Timer.start(meterRegistry);
        
        return CompletableFuture
            .supplyAsync(supplier, executor)
            .whenComplete((result, ex) -> {
                String status = (ex == null) ? "success" : "failure";
                
                sample.stop(Timer.builder("async.operation")
                    .tag("operation", operationName)
                    .tag("status", status)
                    .register(meterRegistry));
                
                if (ex != null) {
                    meterRegistry.counter("async.error",
                        "operation", operationName,
                        "exception", ex.getClass().getSimpleName()
                    ).increment();
                }
            });
    }
}

三、批量任务处理

3.1 业务场景

需要批量处理用户数据,要求:

  • 并行处理提高效率
  • 进度跟踪
  • 错误不影响其他任务
  • 最终汇总结果

3.2 完整实现

@Service
public class BatchProcessor {
    
    private final ExecutorService executor;
    
    public BatchProcessor() {
        this.executor = new ThreadPoolExecutor(
            8, 32, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("batch-%d").build()
        );
    }
    
    /**
     * 批量处理用户数据
     */
    public CompletableFuture<BatchResult> processBatch(
            List<String> userIds,
            Consumer<Progress> progressCallback) {
        
        AtomicInteger completed = new AtomicInteger(0);
        AtomicInteger failed = new AtomicInteger(0);
        
        List<CompletableFuture<UserResult>> futures = userIds.stream()
            .map(userId -> processUserAsync(userId)
                .whenComplete((result, ex) -> {
                    int current = completed.incrementAndGet();
                    if (ex != null || !result.isSuccess()) {
                        failed.incrementAndGet();
                    }
                    
                    // 回调进度
                    progressCallback.accept(new Progress(
                        current,
                        userIds.size(),
                        failed.get()
                    ));
                })
            )
            .collect(Collectors.toList());
        
        // 等待所有任务完成
        return CompletableFuture
            .allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> {
                List<UserResult> results = futures.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
                
                return new BatchResult(
                    userIds.size(),
                    completed.get() - failed.get(),
                    failed.get(),
                    results
                );
            });
    }
    
    /**
     * 处理单个用户(带重试)
     */
    private CompletableFuture<UserResult> processUserAsync(String userId) {
        return CompletableFuture
            .supplyAsync(() -> processUser(userId), executor)
            .orTimeout(5, TimeUnit.SECONDS)
            .exceptionally(ex -> {
                log.error("处理用户失败: {}", userId, ex);
                return UserResult.failure(userId, ex.getMessage());
            });
    }
    
    private UserResult processUser(String userId) {
        // 模拟处理逻辑
        try {
            Thread.sleep(100 + new Random().nextInt(200));
            
            if (Math.random() < 0.05) {
                throw new RuntimeException("随机失败");
            }
            
            return UserResult.success(userId, "处理完成");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }
}

// 使用
batchProcessor.processBatch(userIds, progress -> {
    log.info("进度: {}/{}, 失败: {}", 
        progress.getCurrent(), 
        progress.getTotal(),
        progress.getFailedCount()
    );
}).thenAccept(result -> {
    log.info("处理完成: 成功={}, 失败={}", 
        result.getSuccessCount(), 
        result.getFailureCount()
    );
});

3.3 分批处理(控制并发)

/**
 * 分批处理,控制并发数
 */
public CompletableFuture<List<UserResult>> processBatchWithConcurrencyLimit(
        List<String> userIds, int batchSize) {
    
    List<List<String>> batches = Lists.partition(userIds, batchSize);
    
    CompletableFuture<List<UserResult>> resultFuture = 
        CompletableFuture.completedFuture(new ArrayList<>());
    
    for (List<String> batch : batches) {
        resultFuture = resultFuture.thenCompose(prevResults -> {
            // 并行处理当前批次
            List<CompletableFuture<UserResult>> batchFutures = batch.stream()
                .map(this::processUserAsync)
                .collect(Collectors.toList());
            
            return CompletableFuture
                .allOf(batchFutures.toArray(new CompletableFuture[0]))
                .thenApply(v -> {
                    List<UserResult> batchResults = batchFutures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList());
                    
                    List<UserResult> combined = new ArrayList<>(prevResults);
                    combined.addAll(batchResults);
                    return combined;
                });
        });
    }
    
    return resultFuture;
}

四、超时与重试机制

4.1 指数退避重试

public class RetryableService {
    
    private final ExecutorService executor;
    private final ScheduledExecutorService scheduler;
    
    /**
     * 带指数退避的重试
     */
    public <T> CompletableFuture<T> executeWithRetry(
            Supplier<T> supplier,
            int maxRetries,
            long initialDelayMs) {
        
        return executeWithRetryInternal(supplier, maxRetries, initialDelayMs, 0);
    }
    
    private <T> CompletableFuture<T> executeWithRetryInternal(
            Supplier<T> supplier,
            int maxRetries,
            long delayMs,
            int attempt) {
        
        return CompletableFuture
            .supplyAsync(supplier, executor)
            .exceptionally(ex -> {
                if (attempt >= maxRetries) {
                    log.error("重试 {} 次后仍然失败", maxRetries, ex);
                    throw new RuntimeException("超过最大重试次数", ex);
                }
                
                log.warn("第 {} 次尝试失败,{}ms 后重试", 
                    attempt + 1, delayMs, ex);
                
                // 递归重试(注意:这是同步递归,需要改进)
                throw new RuntimeException("需要重试", ex);
            })
            .thenApply(result -> result)
            .exceptionally(ex -> {
                // 延迟后重试
                try {
                    Thread.sleep(delayMs);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                
                return executeWithRetryInternal(
                    supplier, 
                    maxRetries, 
                    delayMs * 2,  // 指数退避
                    attempt + 1
                ).join();
            });
    }
    
    /**
     * 改进版:完全异步的重试
     */
    public <T> CompletableFuture<T> executeWithAsyncRetry(
            Supplier<CompletableFuture<T>> futureSupplier,
            int maxRetries,
            long initialDelayMs) {
        
        CompletableFuture<T> result = new CompletableFuture<>();
        
        executeWithAsyncRetryInternal(
            futureSupplier, result, maxRetries, initialDelayMs, 0
        );
        
        return result;
    }
    
    private <T> void executeWithAsyncRetryInternal(
            Supplier<CompletableFuture<T>> futureSupplier,
            CompletableFuture<T> result,
            int maxRetries,
            long delayMs,
            int attempt) {
        
        futureSupplier.get().whenComplete((value, ex) -> {
            if (ex == null) {
                result.complete(value);
                return;
            }
            
            if (attempt >= maxRetries) {
                result.completeExceptionally(
                    new RuntimeException("超过最大重试次数: " + maxRetries, ex)
                );
                return;
            }
            
            log.warn("第 {} 次尝试失败,{}ms 后重试", attempt + 1, delayMs);
            
            // 延迟后异步重试
            scheduler.schedule(() -> {
                executeWithAsyncRetryInternal(
                    futureSupplier, result, maxRetries, delayMs * 2, attempt + 1
                );
            }, delayMs, TimeUnit.MILLISECONDS);
        });
    }
}

4.2 熔断器模式

public class CircuitBreaker {
    
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final AtomicLong lastFailureTime = new AtomicLong(0);
    
    private final int threshold;
    private final long resetTimeMs;
    
    public CircuitBreaker(int threshold, long resetTimeMs) {
        this.threshold = threshold;
        this.resetTimeMs = resetTimeMs;
    }
    
    public <T> CompletableFuture<T> execute(
            Supplier<CompletableFuture<T>> supplier,
            T fallback) {
        
        // 检查熔断状态
        if (isOpen()) {
            log.warn("熔断器开启,返回降级值");
            return CompletableFuture.completedFuture(fallback);
        }
        
        return supplier.get()
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    recordFailure();
                } else {
                    reset();
                }
            })
            .exceptionally(ex -> {
                log.warn("调用失败,返回降级值", ex);
                return fallback;
            });
    }
    
    private boolean isOpen() {
        if (failureCount.get() >= threshold) {
            long elapsed = System.currentTimeMillis() - lastFailureTime.get();
            if (elapsed < resetTimeMs) {
                return true;
            }
            // 半开状态,尝试恢复
            failureCount.set(threshold / 2);
        }
        return false;
    }
    
    private void recordFailure() {
        failureCount.incrementAndGet();
        lastFailureTime.set(System.currentTimeMillis());
    }
    
    private void reset() {
        failureCount.set(0);
    }
}

// 使用
CircuitBreaker breaker = new CircuitBreaker(5, 30000);

CompletableFuture<String> result = breaker.execute(
    () -> httpClient.getAsync(url),
    "降级值"
);

五、最佳实践总结

5.1 设计原则

原则 说明
独立异常处理 每个任务独立处理异常,避免连锁失败
设置超时 所有远程调用必须设置超时
提供降级 非核心功能失败时返回默认值
日志追踪 记录关键节点,便于排查问题
资源隔离 不同类型任务使用不同线程池

5.2 性能优化建议

  1. 并行化:识别独立任务,使用 thenCombineallOf 并行执行
  2. 超时控制:合理设置超时,避免长时间阻塞
  3. 批量处理:控制并发数,避免压垮下游服务
  4. 连接池:HTTP/数据库连接池预热,减少冷启动延迟

5.3 常见问题排查

// 问题1:任务没有执行
// 原因:可能使用了默认 ForkJoinPool,线程被阻塞
// 解决:使用自定义线程池

// 问题2:异常被吞掉
// 原因:没有添加异常处理
// 解决:使用 exceptionally 或 handle

// 问题3:内存溢出
// 原因:创建太多 CompletableFuture
// 解决:分批处理,限制并发数

// 问题4:响应慢
// 原因:串行调用,没有并行化
// 解决:使用 thenCombine 或 allOf 并行

六、总结

通过本文的实战案例,你应该掌握了:

  1. 订单处理:并行校验 → 支付 → 并行后续处理
  2. API 聚合:并行调用 + 独立超时 + 独立降级
  3. 批量处理:进度跟踪 + 错误隔离 + 结果汇总
  4. 重试机制:指数退避 + 熔断器

核心思想:将复杂流程拆解为独立任务,并行执行提升效率,独立处理异常保证稳定性。


上一篇回顾《CompletableFuture 组合与异常处理:构建复杂异步流》
我们深入探讨如何组合多个 CompletableFuture,以及优雅处理异步异常的最佳实践。
下一篇预告
《CompletableFuture 线程池与最佳实践:生产环境调优指南》
我们将深入探讨线程池配置、性能调优和生产环境最佳实践。

Logo

电商企业物流数字化转型必备!快递鸟 API 接口,72 小时快速完成物流系统集成。全流程实战1V1指导,营造开放的API技术生态圈。

更多推荐