微服务分布式事务一致性:Seata AT 模式与消息最终一致的深度对比

cover

一、跨服务数据不一致的根源:从单体事务到分布式裂变

单体应用时代,数据库事务通过 ACID 特性保证数据一致性,一条 @Transactional 注解即可覆盖所有操作。但微服务架构将单体拆分为多个独立服务,每个服务拥有自己的数据库实例。一个业务操作(如订单创建)需要同时写入订单库、扣减库存库、扣减账户余额,这三个操作分布在三个不同的数据库实例上,无法共享同一个数据库事务。

分布式事务的核心矛盾在于:强一致性(CP)与高可用性(AP)不可兼得。2PC 协议虽然能保证强一致性,但在协调者宕机时所有参与者被锁定,可用性极差。TCC 模式需要为每个操作编写 Try/Confirm/Cancel 三个方法,开发成本极高。如何在一致性与可用性之间找到平衡点,是分布式事务方案选型的核心命题。

二、两种主流方案的底层机制:Seata AT 与消息最终一致

2.1 Seata AT 模式的两阶段提交机制

Seata AT 模式是对 2PC 的业务无侵入式改造。第一阶段拦截 SQL 执行,生成前镜像(Before Image)和后镜像(After Image),将镜像数据写入 undo_log 表,然后提交本地事务。第二阶段由 TC(事务协调者)根据各分支事务的执行结果,决定全局提交或回滚。全局提交时异步清理 undo_log;全局回滚时根据 undo_log 中的前镜像反向补偿。

sequenceDiagram
    participant TM as 事务管理器(TM)
    participant TC as 事务协调者(TC)
    participant RM1 as 订单服务(RM)
    participant RM2 as 库存服务(RM)
    participant RM3 as 账户服务(RM)

    TM->>TC: 开启全局事务(XID)
    TC-->>TM: 返回XID

    Note over RM1,RM3: 第一阶段:执行业务SQL + 生成镜像
    TM->>RM1: 传递XID,执行订单创建
    RM1->>RM1: 生成Before/After Image
    RM1->>RM1: 提交本地事务 + 写入undo_log
    RM1-->>TC: 分支事务注册(一阶段成功)

    TM->>RM2: 传递XID,执行库存扣减
    RM2->>RM2: 生成Before/After Image
    RM2->>RM2: 提交本地事务 + 写入undo_log
    RM2-->>TC: 分支事务注册(一阶段成功)

    TM->>RM3: 传递XID,执行余额扣减
    RM3-->>TC: 分支事务注册(一阶段失败)

    Note over RM1,RM3: 第二阶段:全局回滚
    TC->>RM1: 发送回滚指令
    RM1->>RM1: 读取undo_log,反向补偿
    RM1-->>TC: 回滚完成

    TC->>RM2: 发送回滚指令
    RM2->>RM2: 读取undo_log,反向补偿
    RM2-->>TC: 回滚完成

2.2 消息最终一致性的本地消息表机制

消息最终一致性方案的核心思路是:将分布式事务拆解为多个本地事务,通过消息的可靠投递和幂等消费保证最终一致性。本地消息表是其中的关键设计——业务操作与消息写入在同一个本地事务中完成,确保"业务执行"与"消息产生"的原子性。

flowchart LR
    subgraph 订单服务
        A[创建订单] --> B[写入本地消息表<br/>同一本地事务]
    end

    subgraph 消息投递
        B --> C[定时任务扫描<br/>未投递消息]
        C --> D[发送到MQ]
        D -->|成功| E[标记已投递]
        D -->|失败| F[等待下次重试]
        F --> C
    end

    subgraph 库存服务
        D --> G[消费消息]
        G --> H[幂等校验]
        H --> I[执行库存扣减]
        I --> J[确认消费]
    end

    style B fill:#e74c3c,color:#fff
    style H fill:#27ae60,color:#fff

三、生产级代码实现

3.1 Seata AT 模式集成

/**
 * 订单服务 - Seata AT 模式全局事务入口
 * @GlobalTransactional 注解由 Seata 提供,自动开启全局事务
 * XID 通过 RPC 请求头透传到下游服务
 */
@Service
public class OrderService {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private InventoryClient inventoryClient;

    @Autowired
    private AccountClient accountClient;

    /**
     * 创建订单:全局事务覆盖订单创建、库存扣减、余额扣减
     * timeoutMills 设为 60000,因为涉及三个服务的 RPC 调用
     * 默认的 30 秒在跨服务调用链路较长时容易超时
     */
    @GlobalTransactional(timeoutMills = 60000, name = "create-order")
    public Order createOrder(OrderDTO orderDTO) {
        // 1. 创建订单记录
        Order order = new Order();
        order.setUserId(orderDTO.getUserId());
        order.setCommodityCode(orderDTO.getCommodityCode());
        order.setCount(orderDTO.getCount());
        order.setMoney(orderDTO.getMoney());
        order.setStatus("INIT");
        orderMapper.insert(order);

        // 2. 远程调用库存服务扣减库存
        // Seata 通过拦截 Feign 请求,将 XID 透传到下游
        inventoryClient.deduct(
            orderDTO.getCommodityCode(), orderDTO.getCount()
        );

        // 3. 远程调用账户服务扣减余额
        accountClient.debit(
            orderDTO.getUserId(), orderDTO.getMoney()
        );

        // 4. 更新订单状态
        order.setStatus("SUCCESS");
        orderMapper.updateById(order);

        return order;
    }
}

3.2 本地消息表实现最终一致性

/**
 * 订单服务 - 本地消息表方案
 * 核心设计:业务操作与消息写入在同一本地事务中完成
 * 消息投递由异步定时任务负责,支持失败重试
 */
@Service
@Slf4j
public class OrderServiceWithMessage {

    @Autowired
    private OrderMapper orderMapper;

    @Autowired
    private MessageTableMapper messageTableMapper;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 创建订单 + 写入本地消息表
     * 两个操作在同一个本地事务中,保证原子性
     * 如果消息写入失败,整个事务回滚,订单也不会创建
     */
    @Transactional(rollbackFor = Exception.class)
    public Order createOrderWithMessage(OrderDTO orderDTO) {
        // 1. 创建订单
        Order order = new Order();
        order.setUserId(orderDTO.getUserId());
        order.setCommodityCode(orderDTO.getCommodityCode());
        order.setCount(orderDTO.getCount());
        order.setMoney(orderDTO.getMoney());
        order.setStatus("INIT");
        orderMapper.insert(order);

        // 2. 写入本地消息表
        // 消息内容为库存扣减所需的参数
        // 状态为待投递,由定时任务扫描后发送到 MQ
        MessageTable message = new MessageTable();
        message.setTopic("inventory-deduct");
        message.setMessageKey("order-" + order.getId());
        message.setMessageBody(
            JSON.toJSONString(Map.of(
                "commodityCode", orderDTO.getCommodityCode(),
                "count", orderDTO.getCount(),
                "orderId", order.getId()
            ))
        );
        message.setStatus("PENDING");
        message.setRetryCount(0);
        message.setNextRetryTime(LocalDateTime.now());
        messageTableMapper.insert(message);

        return order;
    }
}

/**
 * 消息投递定时任务
 * 扫描本地消息表中待投递的消息,发送到 MQ
 * 投递成功后标记为已投递,失败则增加重试次数
 * 重试间隔采用指数退避策略,避免消息风暴
 */
@Component
@Slf4j
public class MessagePublishScheduler {

    @Autowired
    private MessageTableMapper messageTableMapper;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Scheduled(fixedDelay = 5000)
    public void publishPendingMessages() {
        // 查询待投递且到达重试时间的消息
        List<MessageTable> messages = messageTableMapper
            .selectPendingMessages(LocalDateTime.now(), 100);

        for (MessageTable msg : messages) {
            try {
                rocketMQTemplate.syncSend(
                    msg.getTopic(),
                    MessageBuilder.withPayload(msg.getMessageBody())
                        .setKeys(msg.getMessageKey())
                        .build()
                );
                // 投递成功,标记为已投递
                msg.setStatus("PUBLISHED");
                messageTableMapper.updateById(msg);
            } catch (Exception e) {
                log.error("消息投递失败, messageId={}", msg.getId(), e);
                // 指数退避:下次重试时间 = 当前时间 + 2^retryCount * 基础间隔
                int nextDelay = (int) Math.pow(2, msg.getRetryCount()) * 5;
                msg.setRetryCount(msg.getRetryCount() + 1);
                msg.setNextRetryTime(
                    LocalDateTime.now().plusSeconds(nextDelay)
                );
                // 超过最大重试次数,标记为死信,人工介入
                if (msg.getRetryCount() > 10) {
                    msg.setStatus("DEAD_LETTER");
                }
                messageTableMapper.updateById(msg);
            }
        }
    }
}

/**
 * 库存服务 - 消息消费端
 * 核心设计:幂等消费,防止消息重复投递导致库存重复扣减
 */
@Component
@RocketMQMessageListener(
    topic = "inventory-deduct",
    consumerGroup = "inventory-consumer-group"
)
@Slf4j
public class InventoryDeductConsumer
    implements RocketMQListener<String> {

    @Autowired
    private InventoryMapper inventoryMapper;

    @Autowired
    private DeductRecordMapper deductRecordMapper;

    @Override
    public void onMessage(String message) {
        Map<String, Object> params = JSON.parseObject(message, Map.class);
        String orderId = (String) params.get("orderId");

        // 幂等校验:如果该订单已扣减过,直接返回
        // 这是最终一致性方案的关键保障,MQ 可能重复投递消息
        DeductRecord existing = deductRecordMapper
            .selectByOrderId(orderId);
        if (existing != null) {
            log.info("订单已扣减, 跳过, orderId={}", orderId);
            return;
        }

        // 执行库存扣减
        String commodityCode = (String) params.get("commodityCode");
        Integer count = (Integer) params.get("count");
        int updated = inventoryMapper.deductStock(
            commodityCode, count
        );

        if (updated == 0) {
            // 库存不足,记录异常,由人工处理
            // 不抛异常避免 MQ 无限重试
            log.error("库存扣减失败, 库存不足, commodityCode={}",
                commodityCode);
            return;
        }

        // 记录扣减流水,作为幂等校验的依据
        DeductRecord record = new DeductRecord();
        record.setOrderId(orderId);
        record.setCommodityCode(commodityCode);
        record.setDeductCount(count);
        deductRecordMapper.insert(record);
    }
}

四、方案选型权衡:一致性强度与性能吞吐的博弈

两种方案各有明确的适用边界,选型时需从三个维度评估。

一致性强度:Seata AT 模式提供读已提交级别的全局一致性,在全局事务提交前,其他事务可以读到分支事务已提交的中间状态(脏读问题)。消息最终一致性只能保证最终一致,中间状态窗口可能持续数秒到数分钟。对一致性要求极高的金融场景(如转账),AT 模式更合适;对一致性要求可容忍短时延迟的电商场景(如下单扣库存),消息方案更合适。

性能吞吐:Seata AT 模式的一阶段需要生成 undo_log 并写入数据库,增加了约 20-30% 的数据库写入开销。全局锁机制在高并发场景下可能成为瓶颈——当多个全局事务争抢同一行数据的全局锁时,后到的事务需要等待。消息方案没有全局锁,吞吐量更高,但代价是延迟窗口内的数据不一致。

运维复杂度:Seata 需要部署独立的 TC Server 集群,TC 是单点依赖——TC 宕机后所有全局事务无法提交或回滚。消息方案依赖 MQ 的可靠性,需要处理消息积压、消费延迟等运维问题。从故障影响范围看,Seata TC 故障影响面更大(所有使用 AT 模式的服务不可用),MQ 故障影响面更小(仅影响消息投递,业务仍可正常写入本地消息表)。

五、总结

分布式事务方案没有银弹,选型的核心是明确业务对一致性的容忍度。Seata AT 模式适合对一致性要求高、并发量中等的场景(如金融转账、订单支付),通过全局锁和 undo_log 保证强一致回滚。消息最终一致性适合对一致性容忍短时延迟、并发量高的场景(如电商下单、积分发放),通过本地消息表和幂等消费保证最终一致。

落地路线建议:第一步,梳理业务场景,按一致性要求分级——强一致场景用 Seata AT,弱一致场景用消息方案;第二步,搭建 Seata TC Server 集群(至少 3 节点),配置数据库存储模式确保 TC 高可用;第三步,实现本地消息表组件,封装消息写入、投递、重试的通用逻辑;第四步,为所有消息消费端实现幂等校验,这是最终一致性方案的底线保障;第五步,建立分布式事务监控看板,追踪全局事务成功率和消息投递延迟。

Logo

电商企业物流数字化转型必备!快递鸟 API 接口,72 小时快速完成物流系统集成。全流程实战1V1指导,营造开放的API技术生态圈。

更多推荐