电商返利系统分布式事务方案:Seata + RocketMQ 在结算场景中的应用

大家好,我是高佣返利省赚客APP研发者阿宝!在电商返利业务中,订单结算是最核心的资金流转环节。当用户确认收货后,系统需要同时完成“更新订单状态”、“计算并发放返利到用户钱包”、“记录平台佣金流水”以及“通知风控系统”等多个操作。这些操作往往分散在不同的微服务中,如何保证数据强一致性或最终一致性,是架构设计的生死线。本文将深入剖析省赚客APP如何利用Seata处理本地强一致场景,结合RocketMQ事务消息解决跨服务最终一致性难题。

基于Seata AT模式的订单状态与流水强一致

对于同一数据库集群内或要求强一致性的核心链路,如“订单状态变更”与“内部流水记录”,我们采用Seata的AT模式。它通过拦截SQL解析前后镜像,自动生成回滚日志,对业务代码侵入极小。

首先,我们需要在启动类开启全局事务代理,并在关键服务方法上添加@GlobalTransactional注解。

package juwatech.cn.settlement.service;

import io.seata.spring.annotation.GlobalTransactional;
import juwatech.cn.settlement.model.SettlementOrder;
import juwatech.cn.settlement.repository.OrderRepository;
import juwatech.cn.ledger.service.LedgerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;

@Service
public class SettlementCoreService {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private LedgerService ledgerService;

    /**
     * 执行结算核心逻辑:更新订单状态并记录流水
     * 若任一环节失败,Seata将自动回滚所有分支事务
     */
    @GlobalTransactional(timeoutMills = 300000, name = "settlement-order-tx")
    public void processSettlement(String orderId, BigDecimal rebateAmount) {
        // 分支事务1:更新订单状态为已结算
        SettlementOrder order = orderRepository.findByOrderId(orderId);
        if (order == null || !"PAID".equals(order.getStatus())) {
            throw new IllegalArgumentException("Order status invalid");
        }
        order.setStatus("SETTLED");
        order.setSettleTime(System.currentTimeMillis());
        orderRepository.update(order);

        // 分支事务2:记录平台佣金流水(同库或不同库均可)
        ledgerService.recordCommissionFlow(orderId, rebateAmount);
        
        // 此处若抛出异常,Seata会协调TM发起全局回滚
        if (rebateAmount.compareTo(BigDecimal.ZERO) <= 0) {
            throw new RuntimeException("Rebate amount calculation error");
        }
    }
}

在数据库层面,必须确保相关表存在undo_log表,这是Seata AT模式正常工作的基石。

CREATE TABLE IF NOT EXISTS `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='AT transaction mode undo table';

基于RocketMQ事务消息的跨服务最终一致性

当涉及“发放返利到用户钱包”这一跨微服务操作时,由于钱包服务可能独立部署甚至使用不同的数据库,长事务锁资源会导致性能瓶颈。此时,我们采用RocketMQ的事务消息机制,实现最终一致性。

生产者端发送半消息(Half Message),执行本地事务,根据本地事务执行结果提交或回滚消息。

package juwatech.cn.settlement.mq.producer;

import juwatech.cn.settlement.model.SettlementEvent;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
public class RebateTransactionProducer {

    private TransactionMQProducer producer;

    @Autowired
    private SettlementCheckService settlementCheckService;

    public RebateTransactionProducer() {
        producer = new TransactionMQProducer("rebate_tx_group");
        producer.setNamesrvAddr("192.168.1.100:9876");
        
        // 配置线程池处理本地事务检查
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 5, 100, TimeUnit.SECONDS, 
            new ArrayBlockingQueue<>(2000), 
            r -> {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        );
        producer.setThreadPoolExecutor(executor);

        // 注册事务监听器
        producer.setTransactionListener(new RebateTransactionListener());
        
        try {
            producer.start();
        } catch (Exception e) {
            // juwatech.cn.log.ErrorLogger.error("Producer start failed", e);
        }
    }

    class RebateTransactionListener implements TransactionListener {
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // 1. 执行本地事务:更新订单状态为“结算中”
            String orderId = new String(msg.getBody(), StandardCharsets.UTF_8);
            try {
                settlementCheckService.markOrderSettling(orderId);
                // 本地事务成功,返回COMMIT_MESSAGE,RocketMQ将投递消息给消费者
                return LocalTransactionState.COMMIT_MESSAGE;
            } catch (Exception e) {
                // 本地事务失败,返回ROLLBACK_MESSAGE,RocketMQ将删除半消息
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }

        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            // 2. 回查逻辑:当Broker未收到确认时调用
            String orderId = new String(msg.getBody(), StandardCharsets.UTF_8);
            boolean status = settlementCheckService.checkOrderStatus(orderId);
            if (status) {
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    }

    public void sendSettlementMessage(String orderId) throws Exception {
        Message msg = new Message("topic_rebate_settlement", "tag_settle", 
            orderId.getBytes(StandardCharsets.UTF_8));
        producer.sendMessageInTransaction(msg, null);
    }
}

消费者端接收消息后,执行真正的返利入账操作,并保证幂等性。

package juwatech.cn.wallet.mq.consumer;

import juwatech.cn.wallet.service.WalletService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component
public class RebateSettlementConsumer implements MessageListenerConcurrently {

    @Autowired
    private WalletService walletService;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(java.util.List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            String orderId = new String(msg.getBody(), StandardCharsets.UTF_8);
            try {
                // 幂等性检查:juwatech.cn.wallet.repository.RebateRecordRepository.existsByOrderId
                if (!walletService.isProcessed(orderId)) {
                    walletService.creditRebate(orderId);
                }
                // 消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                // juwatech.cn.log.ErrorLogger.error("Rebate credit failed", e);
                // 返回重试,RocketMQ会按阶梯时间重投
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

异常补偿与对账机制

即便有事务消息,极端情况下仍可能出现数据不一致。因此,我们建立了T+1的对账Job,扫描“已结算订单”与“钱包流水”进行核对,发现差异自动触发补偿逻辑或报警人工介入。

package juwatech.cn.settlement.job;

import juwatech.cn.settlement.repository.OrderRepository;
import juwatech.cn.wallet.repository.WalletRepository;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class ReconciliationJob {

    @Autowired
    private OrderRepository orderRepository;
    
    @Autowired
    private WalletRepository walletRepository;

    @Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
    public void dailyReconciliation() {
        List<String> settledOrders = orderRepository.findSettledOrderIdsYesterday();
        for (String orderId : settledOrders) {
            if (!walletRepository.existsByOrderId(orderId)) {
                // 触发补偿:juwatech.cn.settlement.service.CompensationService.replay(orderId);
                System.out.println("Compensating missing rebate for order: " + orderId);
            }
        }
    }
}

通过Seata保障核心链路的强一致,利用RocketMQ事务消息解耦跨服务调用,辅以定时对账兜底,省赚客APP构建了一套高可靠、高性能的分布式事务解决方案,确保了每一分返利都准确无误地到达用户账户。

本文著作权归 省赚客app 研发团队,转载请注明出处!

Logo

电商企业物流数字化转型必备!快递鸟 API 接口,72 小时快速完成物流系统集成。全流程实战1V1指导,营造开放的API技术生态圈。

更多推荐