RocketMQ 从入门到实战 + 高频面试八股

RocketMQ 是阿里开源的分布式消息中间件,基于 Java 开发,主打高吞吐、低延迟、高可用,兼容 Kafka 协议,广泛用于电商、金融等核心业务场景。本文涵盖核心使用流程 + 高频面试考点,兼顾实战与面试需求。

一、核心依赖

<!-- RocketMQ 客户端核心依赖 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.7</version> <!-- 与服务端版本保持一致 -->
</dependency>
<!-- 可选:日志依赖 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>2.0.9</version>
</dependency>

二、核心概念(极简记忆)

术语 作用
Topic 消息主题(分类),生产者发消息到 Topic,消费者订阅 Topic
Message Queue Topic 的物理分片(对应 Kafka 的 Partition),每个 Topic 包含多个队列
Broker 消息服务器,负责存储、转发消息,分为 Master/Slave 架构
NameServer 命名服务,管理 Broker 路由信息,生产者 / 消费者通过它获取 Broker 地址
Producer 生产者:发送消息到 Broker
Consumer 消费者:订阅 Topic 并消费消息,分为 PushConsumer/PullConsumer
Group 分组标识:ProducerGroup(生产者分组)/ConsumerGroup(消费者分组)
Offset 队列内消息的偏移量,消费者通过 Offset 记录消费位置

三、核心实现:生产者(同步 / 异步 / 单向)

1. 同步发送(可靠场景,如订单创建)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * RocketMQ 同步生产者(等待发送结果,保证消息可靠)
 */
public class SyncProducer {
    // 生产者分组(同一组内生产者共享配置)
    private static final String PRODUCER_GROUP = "sync_producer_group";
    // NameServer 地址(多个用分号分隔)
    private static final String NAMESRV_ADDR = "127.0.0.1:9876";
    // 目标 Topic
    private static final String TOPIC_NAME = "test_topic";

    public static void main(String[] args) throws Exception {
        // 1. 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        // 2. 设置 NameServer 地址
        producer.setNamesrvAddr(NAMESRV_ADDR);
        // 3. 启动生产者
        producer.start();

        // 4. 构建消息(Topic + Tag + 消息体)
        // Tag:消息子分类,便于消费者过滤(如 "order:create")
        Message message = new Message(
                TOPIC_NAME,
                "test_tag",
                "Hello RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET)
        );

        // 5. 同步发送(阻塞等待结果)
        SendResult sendResult = producer.send(message);
        System.out.printf("消息发送成功 | 状态:%s | 队列ID:%d | Offset:%d%n",
                sendResult.getSendStatus(),
                sendResult.getMessageQueue().getQueueId(),
                sendResult.getQueueOffset());

        // 6. 关闭生产者
        producer.shutdown();
    }
}

2. 异步发送(高吞吐场景,如日志收集)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * RocketMQ 异步生产者(非阻塞,回调处理结果)
 */
public class AsyncProducer {
    private static final String PRODUCER_GROUP = "async_producer_group";
    private static final String NAMESRV_ADDR = "127.0.0.1:9876";
    private static final String TOPIC_NAME = "test_topic";

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
        producer.setNamesrvAddr(NAMESRV_ADDR);
        producer.start();
        // 异步发送不重试(默认重试2次,可关闭)
        producer.setRetryTimesWhenSendAsyncFailed(0);

        // 批量异步发送5条消息
        for (int i = 0; i < 5; i++) {
            Message message = new Message(
                    TOPIC_NAME,
                    "test_tag",
                    ("Async message " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );

            // 异步发送 + 回调
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("消息发送成功 | 队列ID:%d | Offset:%d%n",
                            sendResult.getMessageQueue().getQueueId(),
                            sendResult.getQueueOffset());
                }

                @Override
                public void onException(Throwable e) {
                    System.err.println("消息发送失败:" + e.getMessage());
                    // 失败处理:重试/告警/写入死信
                }
            });
        }

        // 等待回调执行完成(避免主线程退出)
        Thread.sleep(5000);
        producer.shutdown();
    }
}

3. 单向发送(无需确认场景,如日志打点)

/**
 * 单向发送(只发不管结果,最高吞吐,可能丢消息)
 */
public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        Message message = new Message("test_topic", "test_tag", "Oneway message".getBytes());
        // 单向发送(无返回值)
        producer.sendOneway(message);

        producer.shutdown();
    }
}

四、核心实现:消费者(Push/Pull)

1. PushConsumer(生产推荐,主动推送)

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

/**
 * PushConsumer(推模式,Broker 主动推送消息给消费者)
 */
public class PushConsumer {
    // 消费者分组(同一组内消费者均分队列)
    private static final String CONSUMER_GROUP = "push_consumer_group";
    private static final String NAMESRV_ADDR = "127.0.0.1:9876";
    private static final String TOPIC_NAME = "test_topic";

    public static void main(String[] args) throws Exception {
        // 1. 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        // 2. 设置 NameServer 地址
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        // 3. 订阅 Topic(可指定 Tag,如 "test_tag || order_tag")
        // * 表示订阅所有 Tag
        consumer.subscribe(TOPIC_NAME, "*");

        // 4. 注册消息监听器(并发消费)
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                try {
                    String msgBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    System.out.printf("消费消息 | 队列ID:%d | Offset:%d | 内容:%s%n",
                            msg.getQueueId(),
                            msg.getQueueOffset(),
                            msgBody);
                    // 业务处理
                } catch (Exception e) {
                    System.err.println("消息处理失败:" + e.getMessage());
                    // 消费失败,返回 RECONSUME_LATER(稍后重试)
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            // 消费成功,返回 CONSUME_SUCCESS
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 5. 启动消费者
        consumer.start();
        System.out.println("消费者启动成功,等待消息...");
    }
}

2. PullConsumer(手动拉取,精准控制)

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.Set;

/**
 * PullConsumer(拉模式,手动控制拉取频率和 Offset)
 */
public class PullConsumer {
    private static final String CONSUMER_GROUP = "pull_consumer_group";
    private static final String NAMESRV_ADDR = "127.0.0.1:9876";
    private static final String TOPIC_NAME = "test_topic";

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.setMessageModel(MessageModel.CLUSTERING); // 集群模式(默认)
        consumer.start();

        // 获取 Topic 所有队列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(TOPIC_NAME);
        for (MessageQueue mq : mqs) {
            System.out.printf("开始消费队列:%s%n", mq);
            // 拉取消息(从最新 Offset 开始)
            long offset = consumer.fetchConsumeOffset(mq, false);
            while (true) {
                try {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32);
                    offset = pullResult.getNextBeginOffset();
                    // 处理消息
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            pullResult.getMsgFoundList().forEach(msg -> {
                                try {
                                    System.out.println("消费消息:" + new String(msg.getBody()));
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            });
                            // 提交 Offset
                            consumer.updateConsumeOffset(mq, offset);
                            break;
                        case NO_NEW_MSG:
                            // 无新消息,退出循环
                            return;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

五、核心特性(生产级)

1. 消息重试机制

  • 消费失败时返回 RECONSUME_LATER,RocketMQ 会自动重试(默认 16 次);
  • 重试间隔逐渐增加(1s→5s→10s→30s→1min…);
  • 超过重试次数后,消息进入死信队列(% DLQ%+ 消费者组名),需手动处理。

2. 事务消息(最终一致性)

适用于分布式事务场景(如订单创建 + 库存扣减):

// 1. 创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group");
// 2. 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务(如扣减库存)
        try {
            // 本地事务成功
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            // 本地事务失败
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 回查本地事务状态(Broker 回调)
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});
// 3. 发送半事务消息
producer.sendMessageInTransaction(message, null);

3. 延时消息

支持定时投递(如订单超时关闭):

// 设置延时级别(1=1s,2=5s,3=10s,4=30s,5=1min...18=2h)
message.setDelayTimeLevel(3); // 10s 后投递

六、高频面试八股

1. RocketMQ 核心架构?

回答:RocketMQ 由 4 核心组件构成:

  • NameServer:轻量级命名服务,无状态,管理 Broker 路由信息,支持动态扩缩容;
  • Broker:核心消息节点,分 Master/Slave,Master 处理读写,Slave 同步数据做备份;
  • Producer:消息生产者,通过 NameServer 获取 Broker 地址,发送消息到 Master;
  • Consumer:消息消费者,通过 NameServer 获取 Broker 地址,从 Master/Slave 消费消息;
  • 架构特点:无单点故障,Broker 集群化,NameServer 多节点部署。

2. RocketMQ 如何保证消息不丢失?

回答:从生产、存储、消费三层保障:

  • 生产端
    1. 同步发送(等待 Broker 确认),开启重试(retryTimesWhenSendFailed);
    2. 事务消息通过回查机制保证最终一致性;
  • 存储端
    1. Broker 刷盘策略:SYNC_FLUSH(同步刷盘,写入磁盘后返回确认);
    2. Master/Slave 同步:SYNC_MASTER(Master 写入后同步 Slave,确认后返回);
  • 消费端
    1. 消费成功后返回 CONSUME_SUCCESS,失败返回 RECONSUME_LATER 重试;
    2. 关闭自动提交 Offset,手动控制消费确认。

3. RocketMQ 与 Kafka 的区别?

维度 RocketMQ Kafka
架构 NameServer + Broker(Master/Slave) Broker 集群(无独立命名服务)
消息模型 Topic + Tag(更细粒度过滤) Topic + Partition
事务支持 原生支持事务消息(最终一致性) 需结合事务 API 实现
延时消息 原生支持固定级别延时 需自定义实现
重试机制 内置重试 + 死信队列 需手动实现重试 / 死信
运维监控 内置 Dashboard,运维友好 需依赖第三方工具(如 Kafka Eagle)
生态 适配阿里系中间件,国产化优势 大数据生态完善(Spark/Flink)

4. RocketMQ 的消息存储机制?

回答:RocketMQ 消息存储基于文件系统,核心结构:

  • CommitLog:所有 Topic 的消息混合存储在一个日志文件中(默认 1G / 文件),顺序写入;
  • ConsumeQueue:消息消费队列(逻辑队列),存储 CommitLog 的偏移量和消息长度,相当于索引;
  • IndexFile:索引文件,根据消息 Key 快速查询消息;
  • 优势:CommitLog 顺序写提升性能,ConsumeQueue 减少磁盘 IO,索引加速查询。

5. RocketMQ 的负载均衡策略?

回答:消费者组内的队列分配策略:

  1. 平均分配(默认):队列均匀分配给消费者(如 8 队列 → 2 消费者,每人 4 个);
  2. 按机房分配:优先分配同机房的队列,减少跨机房网络开销;
  3. 一致性哈希:队列按哈希分布,消费者上下线时仅少量队列迁移;
  4. 自定义策略:实现 AllocateMessageQueueStrategy 接口自定义分配逻辑。

6. RocketMQ 如何处理消息积压?

回答

  1. 临时扩容:增加消费者数量(不超过队列数),提升消费并行度;
  2. 消费降级:暂停非核心业务消费,优先处理核心消息;
  3. 批量消费:增大 consumeMessageBatchMaxSize,批量拉取 / 处理消息;
  4. 分流处理:新增消费者组,临时消费积压消息,处理后写入新 Topic;
  5. 优化消费逻辑:异步处理消息,减少数据库 / 远程调用耗时。

7. RocketMQ 的高可用如何保证?

回答

  1. NameServer 高可用:多节点部署,无状态,客户端轮询连接 NameServer;
  2. Broker 高可用:Master/Slave 架构,Master 宕机后,消费者可切换到 Slave 消费;
  3. 消息高可用
    • 同步刷盘 + 同步复制,保证消息落地且有副本;
    • 消息重试 + 死信队列,避免消息丢失;
  4. 故障自动切换:Broker 宕机后,NameServer 自动更新路由,生产者 / 消费者感知并切换。

七、核心流程记忆(极简版)

生产者流程

  1. 创生产者 → 设 NameServer → 启动;
  2. 构建消息(Topic + Tag + 内容);
  3. 选择发送模式(同步 / 异步 / 单向);
  4. 发送消息 → 处理结果 / 回调;
  5. 关闭生产者。

消费者流程

  1. 创消费者 → 设 NameServer → 订阅 Topic;
  2. 注册监听器 → 处理消息(成功 / 失败返回对应状态);
  3. 启动消费者 → 持续消费。

八、测试验证

  1. 启动 NameServer:nohup sh mqnamesrv &
  2. 启动 Broker:nohup sh mqbroker -n 127.0.0.1:9876 &
  3. 创建 Topic:sh mqadmin updateTopic -n 127.0.0.1:9876 -t test_topic -b localhost:10911
  4. 运行生产者发送消息,消费者消费消息。
Logo

电商企业物流数字化转型必备!快递鸟 API 接口,72 小时快速完成物流系统集成。全流程实战1V1指导,营造开放的API技术生态圈。

更多推荐