微服务分布式事务一致性:Seata AT 模式与消息最终一致的深度对比
微服务分布式事务一致性:Seata AT 模式与消息最终一致的深度对比

一、跨服务数据不一致的根源:从单体事务到分布式裂变
单体应用时代,数据库事务通过 ACID 特性保证数据一致性,一条 @Transactional 注解即可覆盖所有操作。但微服务架构将单体拆分为多个独立服务,每个服务拥有自己的数据库实例。一个业务操作(如订单创建)需要同时写入订单库、扣减库存库、扣减账户余额,这三个操作分布在三个不同的数据库实例上,无法共享同一个数据库事务。
分布式事务的核心矛盾在于:强一致性(CP)与高可用性(AP)不可兼得。2PC 协议虽然能保证强一致性,但在协调者宕机时所有参与者被锁定,可用性极差。TCC 模式需要为每个操作编写 Try/Confirm/Cancel 三个方法,开发成本极高。如何在一致性与可用性之间找到平衡点,是分布式事务方案选型的核心命题。
二、两种主流方案的底层机制:Seata AT 与消息最终一致
2.1 Seata AT 模式的两阶段提交机制
Seata AT 模式是对 2PC 的业务无侵入式改造。第一阶段拦截 SQL 执行,生成前镜像(Before Image)和后镜像(After Image),将镜像数据写入 undo_log 表,然后提交本地事务。第二阶段由 TC(事务协调者)根据各分支事务的执行结果,决定全局提交或回滚。全局提交时异步清理 undo_log;全局回滚时根据 undo_log 中的前镜像反向补偿。
sequenceDiagram
participant TM as 事务管理器(TM)
participant TC as 事务协调者(TC)
participant RM1 as 订单服务(RM)
participant RM2 as 库存服务(RM)
participant RM3 as 账户服务(RM)
TM->>TC: 开启全局事务(XID)
TC-->>TM: 返回XID
Note over RM1,RM3: 第一阶段:执行业务SQL + 生成镜像
TM->>RM1: 传递XID,执行订单创建
RM1->>RM1: 生成Before/After Image
RM1->>RM1: 提交本地事务 + 写入undo_log
RM1-->>TC: 分支事务注册(一阶段成功)
TM->>RM2: 传递XID,执行库存扣减
RM2->>RM2: 生成Before/After Image
RM2->>RM2: 提交本地事务 + 写入undo_log
RM2-->>TC: 分支事务注册(一阶段成功)
TM->>RM3: 传递XID,执行余额扣减
RM3-->>TC: 分支事务注册(一阶段失败)
Note over RM1,RM3: 第二阶段:全局回滚
TC->>RM1: 发送回滚指令
RM1->>RM1: 读取undo_log,反向补偿
RM1-->>TC: 回滚完成
TC->>RM2: 发送回滚指令
RM2->>RM2: 读取undo_log,反向补偿
RM2-->>TC: 回滚完成
2.2 消息最终一致性的本地消息表机制
消息最终一致性方案的核心思路是:将分布式事务拆解为多个本地事务,通过消息的可靠投递和幂等消费保证最终一致性。本地消息表是其中的关键设计——业务操作与消息写入在同一个本地事务中完成,确保"业务执行"与"消息产生"的原子性。
flowchart LR
subgraph 订单服务
A[创建订单] --> B[写入本地消息表<br/>同一本地事务]
end
subgraph 消息投递
B --> C[定时任务扫描<br/>未投递消息]
C --> D[发送到MQ]
D -->|成功| E[标记已投递]
D -->|失败| F[等待下次重试]
F --> C
end
subgraph 库存服务
D --> G[消费消息]
G --> H[幂等校验]
H --> I[执行库存扣减]
I --> J[确认消费]
end
style B fill:#e74c3c,color:#fff
style H fill:#27ae60,color:#fff
三、生产级代码实现
3.1 Seata AT 模式集成
/**
* 订单服务 - Seata AT 模式全局事务入口
* @GlobalTransactional 注解由 Seata 提供,自动开启全局事务
* XID 通过 RPC 请求头透传到下游服务
*/
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private InventoryClient inventoryClient;
@Autowired
private AccountClient accountClient;
/**
* 创建订单:全局事务覆盖订单创建、库存扣减、余额扣减
* timeoutMills 设为 60000,因为涉及三个服务的 RPC 调用
* 默认的 30 秒在跨服务调用链路较长时容易超时
*/
@GlobalTransactional(timeoutMills = 60000, name = "create-order")
public Order createOrder(OrderDTO orderDTO) {
// 1. 创建订单记录
Order order = new Order();
order.setUserId(orderDTO.getUserId());
order.setCommodityCode(orderDTO.getCommodityCode());
order.setCount(orderDTO.getCount());
order.setMoney(orderDTO.getMoney());
order.setStatus("INIT");
orderMapper.insert(order);
// 2. 远程调用库存服务扣减库存
// Seata 通过拦截 Feign 请求,将 XID 透传到下游
inventoryClient.deduct(
orderDTO.getCommodityCode(), orderDTO.getCount()
);
// 3. 远程调用账户服务扣减余额
accountClient.debit(
orderDTO.getUserId(), orderDTO.getMoney()
);
// 4. 更新订单状态
order.setStatus("SUCCESS");
orderMapper.updateById(order);
return order;
}
}
3.2 本地消息表实现最终一致性
/**
* 订单服务 - 本地消息表方案
* 核心设计:业务操作与消息写入在同一本地事务中完成
* 消息投递由异步定时任务负责,支持失败重试
*/
@Service
@Slf4j
public class OrderServiceWithMessage {
@Autowired
private OrderMapper orderMapper;
@Autowired
private MessageTableMapper messageTableMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 创建订单 + 写入本地消息表
* 两个操作在同一个本地事务中,保证原子性
* 如果消息写入失败,整个事务回滚,订单也不会创建
*/
@Transactional(rollbackFor = Exception.class)
public Order createOrderWithMessage(OrderDTO orderDTO) {
// 1. 创建订单
Order order = new Order();
order.setUserId(orderDTO.getUserId());
order.setCommodityCode(orderDTO.getCommodityCode());
order.setCount(orderDTO.getCount());
order.setMoney(orderDTO.getMoney());
order.setStatus("INIT");
orderMapper.insert(order);
// 2. 写入本地消息表
// 消息内容为库存扣减所需的参数
// 状态为待投递,由定时任务扫描后发送到 MQ
MessageTable message = new MessageTable();
message.setTopic("inventory-deduct");
message.setMessageKey("order-" + order.getId());
message.setMessageBody(
JSON.toJSONString(Map.of(
"commodityCode", orderDTO.getCommodityCode(),
"count", orderDTO.getCount(),
"orderId", order.getId()
))
);
message.setStatus("PENDING");
message.setRetryCount(0);
message.setNextRetryTime(LocalDateTime.now());
messageTableMapper.insert(message);
return order;
}
}
/**
* 消息投递定时任务
* 扫描本地消息表中待投递的消息,发送到 MQ
* 投递成功后标记为已投递,失败则增加重试次数
* 重试间隔采用指数退避策略,避免消息风暴
*/
@Component
@Slf4j
public class MessagePublishScheduler {
@Autowired
private MessageTableMapper messageTableMapper;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Scheduled(fixedDelay = 5000)
public void publishPendingMessages() {
// 查询待投递且到达重试时间的消息
List<MessageTable> messages = messageTableMapper
.selectPendingMessages(LocalDateTime.now(), 100);
for (MessageTable msg : messages) {
try {
rocketMQTemplate.syncSend(
msg.getTopic(),
MessageBuilder.withPayload(msg.getMessageBody())
.setKeys(msg.getMessageKey())
.build()
);
// 投递成功,标记为已投递
msg.setStatus("PUBLISHED");
messageTableMapper.updateById(msg);
} catch (Exception e) {
log.error("消息投递失败, messageId={}", msg.getId(), e);
// 指数退避:下次重试时间 = 当前时间 + 2^retryCount * 基础间隔
int nextDelay = (int) Math.pow(2, msg.getRetryCount()) * 5;
msg.setRetryCount(msg.getRetryCount() + 1);
msg.setNextRetryTime(
LocalDateTime.now().plusSeconds(nextDelay)
);
// 超过最大重试次数,标记为死信,人工介入
if (msg.getRetryCount() > 10) {
msg.setStatus("DEAD_LETTER");
}
messageTableMapper.updateById(msg);
}
}
}
}
/**
* 库存服务 - 消息消费端
* 核心设计:幂等消费,防止消息重复投递导致库存重复扣减
*/
@Component
@RocketMQMessageListener(
topic = "inventory-deduct",
consumerGroup = "inventory-consumer-group"
)
@Slf4j
public class InventoryDeductConsumer
implements RocketMQListener<String> {
@Autowired
private InventoryMapper inventoryMapper;
@Autowired
private DeductRecordMapper deductRecordMapper;
@Override
public void onMessage(String message) {
Map<String, Object> params = JSON.parseObject(message, Map.class);
String orderId = (String) params.get("orderId");
// 幂等校验:如果该订单已扣减过,直接返回
// 这是最终一致性方案的关键保障,MQ 可能重复投递消息
DeductRecord existing = deductRecordMapper
.selectByOrderId(orderId);
if (existing != null) {
log.info("订单已扣减, 跳过, orderId={}", orderId);
return;
}
// 执行库存扣减
String commodityCode = (String) params.get("commodityCode");
Integer count = (Integer) params.get("count");
int updated = inventoryMapper.deductStock(
commodityCode, count
);
if (updated == 0) {
// 库存不足,记录异常,由人工处理
// 不抛异常避免 MQ 无限重试
log.error("库存扣减失败, 库存不足, commodityCode={}",
commodityCode);
return;
}
// 记录扣减流水,作为幂等校验的依据
DeductRecord record = new DeductRecord();
record.setOrderId(orderId);
record.setCommodityCode(commodityCode);
record.setDeductCount(count);
deductRecordMapper.insert(record);
}
}
四、方案选型权衡:一致性强度与性能吞吐的博弈
两种方案各有明确的适用边界,选型时需从三个维度评估。
一致性强度:Seata AT 模式提供读已提交级别的全局一致性,在全局事务提交前,其他事务可以读到分支事务已提交的中间状态(脏读问题)。消息最终一致性只能保证最终一致,中间状态窗口可能持续数秒到数分钟。对一致性要求极高的金融场景(如转账),AT 模式更合适;对一致性要求可容忍短时延迟的电商场景(如下单扣库存),消息方案更合适。
性能吞吐:Seata AT 模式的一阶段需要生成 undo_log 并写入数据库,增加了约 20-30% 的数据库写入开销。全局锁机制在高并发场景下可能成为瓶颈——当多个全局事务争抢同一行数据的全局锁时,后到的事务需要等待。消息方案没有全局锁,吞吐量更高,但代价是延迟窗口内的数据不一致。
运维复杂度:Seata 需要部署独立的 TC Server 集群,TC 是单点依赖——TC 宕机后所有全局事务无法提交或回滚。消息方案依赖 MQ 的可靠性,需要处理消息积压、消费延迟等运维问题。从故障影响范围看,Seata TC 故障影响面更大(所有使用 AT 模式的服务不可用),MQ 故障影响面更小(仅影响消息投递,业务仍可正常写入本地消息表)。
五、总结
分布式事务方案没有银弹,选型的核心是明确业务对一致性的容忍度。Seata AT 模式适合对一致性要求高、并发量中等的场景(如金融转账、订单支付),通过全局锁和 undo_log 保证强一致回滚。消息最终一致性适合对一致性容忍短时延迟、并发量高的场景(如电商下单、积分发放),通过本地消息表和幂等消费保证最终一致。
落地路线建议:第一步,梳理业务场景,按一致性要求分级——强一致场景用 Seata AT,弱一致场景用消息方案;第二步,搭建 Seata TC Server 集群(至少 3 节点),配置数据库存储模式确保 TC 高可用;第三步,实现本地消息表组件,封装消息写入、投递、重试的通用逻辑;第四步,为所有消息消费端实现幂等校验,这是最终一致性方案的底线保障;第五步,建立分布式事务监控看板,追踪全局事务成功率和消息投递延迟。
更多推荐


所有评论(0)