RocketMQ
·
RocketMQ 从入门到实战 + 高频面试八股
RocketMQ 是阿里开源的分布式消息中间件,基于 Java 开发,主打高吞吐、低延迟、高可用,兼容 Kafka 协议,广泛用于电商、金融等核心业务场景。本文涵盖核心使用流程 + 高频面试考点,兼顾实战与面试需求。
一、核心依赖
<!-- RocketMQ 客户端核心依赖 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.7</version> <!-- 与服务端版本保持一致 -->
</dependency>
<!-- 可选:日志依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.9</version>
</dependency>
二、核心概念(极简记忆)
| 术语 | 作用 |
|---|---|
| Topic | 消息主题(分类),生产者发消息到 Topic,消费者订阅 Topic |
| Message Queue | Topic 的物理分片(对应 Kafka 的 Partition),每个 Topic 包含多个队列 |
| Broker | 消息服务器,负责存储、转发消息,分为 Master/Slave 架构 |
| NameServer | 命名服务,管理 Broker 路由信息,生产者 / 消费者通过它获取 Broker 地址 |
| Producer | 生产者:发送消息到 Broker |
| Consumer | 消费者:订阅 Topic 并消费消息,分为 PushConsumer/PullConsumer |
| Group | 分组标识:ProducerGroup(生产者分组)/ConsumerGroup(消费者分组) |
| Offset | 队列内消息的偏移量,消费者通过 Offset 记录消费位置 |
三、核心实现:生产者(同步 / 异步 / 单向)
1. 同步发送(可靠场景,如订单创建)
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* RocketMQ 同步生产者(等待发送结果,保证消息可靠)
*/
public class SyncProducer {
// 生产者分组(同一组内生产者共享配置)
private static final String PRODUCER_GROUP = "sync_producer_group";
// NameServer 地址(多个用分号分隔)
private static final String NAMESRV_ADDR = "127.0.0.1:9876";
// 目标 Topic
private static final String TOPIC_NAME = "test_topic";
public static void main(String[] args) throws Exception {
// 1. 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
// 2. 设置 NameServer 地址
producer.setNamesrvAddr(NAMESRV_ADDR);
// 3. 启动生产者
producer.start();
// 4. 构建消息(Topic + Tag + 消息体)
// Tag:消息子分类,便于消费者过滤(如 "order:create")
Message message = new Message(
TOPIC_NAME,
"test_tag",
"Hello RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 5. 同步发送(阻塞等待结果)
SendResult sendResult = producer.send(message);
System.out.printf("消息发送成功 | 状态:%s | 队列ID:%d | Offset:%d%n",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
sendResult.getQueueOffset());
// 6. 关闭生产者
producer.shutdown();
}
}
2. 异步发送(高吞吐场景,如日志收集)
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
* RocketMQ 异步生产者(非阻塞,回调处理结果)
*/
public class AsyncProducer {
private static final String PRODUCER_GROUP = "async_producer_group";
private static final String NAMESRV_ADDR = "127.0.0.1:9876";
private static final String TOPIC_NAME = "test_topic";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);
producer.setNamesrvAddr(NAMESRV_ADDR);
producer.start();
// 异步发送不重试(默认重试2次,可关闭)
producer.setRetryTimesWhenSendAsyncFailed(0);
// 批量异步发送5条消息
for (int i = 0; i < 5; i++) {
Message message = new Message(
TOPIC_NAME,
"test_tag",
("Async message " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 异步发送 + 回调
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("消息发送成功 | 队列ID:%d | Offset:%d%n",
sendResult.getMessageQueue().getQueueId(),
sendResult.getQueueOffset());
}
@Override
public void onException(Throwable e) {
System.err.println("消息发送失败:" + e.getMessage());
// 失败处理:重试/告警/写入死信
}
});
}
// 等待回调执行完成(避免主线程退出)
Thread.sleep(5000);
producer.shutdown();
}
}
3. 单向发送(无需确认场景,如日志打点)
/**
* 单向发送(只发不管结果,最高吞吐,可能丢消息)
*/
public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message message = new Message("test_topic", "test_tag", "Oneway message".getBytes());
// 单向发送(无返回值)
producer.sendOneway(message);
producer.shutdown();
}
}
四、核心实现:消费者(Push/Pull)
1. PushConsumer(生产推荐,主动推送)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
/**
* PushConsumer(推模式,Broker 主动推送消息给消费者)
*/
public class PushConsumer {
// 消费者分组(同一组内消费者均分队列)
private static final String CONSUMER_GROUP = "push_consumer_group";
private static final String NAMESRV_ADDR = "127.0.0.1:9876";
private static final String TOPIC_NAME = "test_topic";
public static void main(String[] args) throws Exception {
// 1. 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
// 2. 设置 NameServer 地址
consumer.setNamesrvAddr(NAMESRV_ADDR);
// 3. 订阅 Topic(可指定 Tag,如 "test_tag || order_tag")
// * 表示订阅所有 Tag
consumer.subscribe(TOPIC_NAME, "*");
// 4. 注册消息监听器(并发消费)
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
String msgBody = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.printf("消费消息 | 队列ID:%d | Offset:%d | 内容:%s%n",
msg.getQueueId(),
msg.getQueueOffset(),
msgBody);
// 业务处理
} catch (Exception e) {
System.err.println("消息处理失败:" + e.getMessage());
// 消费失败,返回 RECONSUME_LATER(稍后重试)
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功,返回 CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 5. 启动消费者
consumer.start();
System.out.println("消费者启动成功,等待消息...");
}
}
2. PullConsumer(手动拉取,精准控制)
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.Set;
/**
* PullConsumer(拉模式,手动控制拉取频率和 Offset)
*/
public class PullConsumer {
private static final String CONSUMER_GROUP = "pull_consumer_group";
private static final String NAMESRV_ADDR = "127.0.0.1:9876";
private static final String TOPIC_NAME = "test_topic";
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.setMessageModel(MessageModel.CLUSTERING); // 集群模式(默认)
consumer.start();
// 获取 Topic 所有队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(TOPIC_NAME);
for (MessageQueue mq : mqs) {
System.out.printf("开始消费队列:%s%n", mq);
// 拉取消息(从最新 Offset 开始)
long offset = consumer.fetchConsumeOffset(mq, false);
while (true) {
try {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, "*", offset, 32);
offset = pullResult.getNextBeginOffset();
// 处理消息
switch (pullResult.getPullStatus()) {
case FOUND:
pullResult.getMsgFoundList().forEach(msg -> {
try {
System.out.println("消费消息:" + new String(msg.getBody()));
} catch (Exception e) {
e.printStackTrace();
}
});
// 提交 Offset
consumer.updateConsumeOffset(mq, offset);
break;
case NO_NEW_MSG:
// 无新消息,退出循环
return;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
五、核心特性(生产级)
1. 消息重试机制
- 消费失败时返回
RECONSUME_LATER,RocketMQ 会自动重试(默认 16 次); - 重试间隔逐渐增加(1s→5s→10s→30s→1min…);
- 超过重试次数后,消息进入死信队列(% DLQ%+ 消费者组名),需手动处理。
2. 事务消息(最终一致性)
适用于分布式事务场景(如订单创建 + 库存扣减):
// 1. 创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group");
// 2. 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务(如扣减库存)
try {
// 本地事务成功
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务状态(Broker 回调)
return LocalTransactionState.COMMIT_MESSAGE;
}
});
// 3. 发送半事务消息
producer.sendMessageInTransaction(message, null);
3. 延时消息
支持定时投递(如订单超时关闭):
// 设置延时级别(1=1s,2=5s,3=10s,4=30s,5=1min...18=2h)
message.setDelayTimeLevel(3); // 10s 后投递
六、高频面试八股
1. RocketMQ 核心架构?
回答:RocketMQ 由 4 核心组件构成:
- NameServer:轻量级命名服务,无状态,管理 Broker 路由信息,支持动态扩缩容;
- Broker:核心消息节点,分 Master/Slave,Master 处理读写,Slave 同步数据做备份;
- Producer:消息生产者,通过 NameServer 获取 Broker 地址,发送消息到 Master;
- Consumer:消息消费者,通过 NameServer 获取 Broker 地址,从 Master/Slave 消费消息;
- 架构特点:无单点故障,Broker 集群化,NameServer 多节点部署。
2. RocketMQ 如何保证消息不丢失?
回答:从生产、存储、消费三层保障:
- 生产端:
- 同步发送(等待 Broker 确认),开启重试(
retryTimesWhenSendFailed); - 事务消息通过回查机制保证最终一致性;
- 同步发送(等待 Broker 确认),开启重试(
- 存储端:
- Broker 刷盘策略:
SYNC_FLUSH(同步刷盘,写入磁盘后返回确认); - Master/Slave 同步:
SYNC_MASTER(Master 写入后同步 Slave,确认后返回);
- Broker 刷盘策略:
- 消费端:
- 消费成功后返回
CONSUME_SUCCESS,失败返回RECONSUME_LATER重试; - 关闭自动提交 Offset,手动控制消费确认。
- 消费成功后返回
3. RocketMQ 与 Kafka 的区别?
| 维度 | RocketMQ | Kafka |
|---|---|---|
| 架构 | NameServer + Broker(Master/Slave) | Broker 集群(无独立命名服务) |
| 消息模型 | Topic + Tag(更细粒度过滤) | Topic + Partition |
| 事务支持 | 原生支持事务消息(最终一致性) | 需结合事务 API 实现 |
| 延时消息 | 原生支持固定级别延时 | 需自定义实现 |
| 重试机制 | 内置重试 + 死信队列 | 需手动实现重试 / 死信 |
| 运维监控 | 内置 Dashboard,运维友好 | 需依赖第三方工具(如 Kafka Eagle) |
| 生态 | 适配阿里系中间件,国产化优势 | 大数据生态完善(Spark/Flink) |
4. RocketMQ 的消息存储机制?
回答:RocketMQ 消息存储基于文件系统,核心结构:
- CommitLog:所有 Topic 的消息混合存储在一个日志文件中(默认 1G / 文件),顺序写入;
- ConsumeQueue:消息消费队列(逻辑队列),存储 CommitLog 的偏移量和消息长度,相当于索引;
- IndexFile:索引文件,根据消息 Key 快速查询消息;
- 优势:CommitLog 顺序写提升性能,ConsumeQueue 减少磁盘 IO,索引加速查询。
5. RocketMQ 的负载均衡策略?
回答:消费者组内的队列分配策略:
- 平均分配(默认):队列均匀分配给消费者(如 8 队列 → 2 消费者,每人 4 个);
- 按机房分配:优先分配同机房的队列,减少跨机房网络开销;
- 一致性哈希:队列按哈希分布,消费者上下线时仅少量队列迁移;
- 自定义策略:实现
AllocateMessageQueueStrategy接口自定义分配逻辑。
6. RocketMQ 如何处理消息积压?
回答:
- 临时扩容:增加消费者数量(不超过队列数),提升消费并行度;
- 消费降级:暂停非核心业务消费,优先处理核心消息;
- 批量消费:增大
consumeMessageBatchMaxSize,批量拉取 / 处理消息; - 分流处理:新增消费者组,临时消费积压消息,处理后写入新 Topic;
- 优化消费逻辑:异步处理消息,减少数据库 / 远程调用耗时。
7. RocketMQ 的高可用如何保证?
回答:
- NameServer 高可用:多节点部署,无状态,客户端轮询连接 NameServer;
- Broker 高可用:Master/Slave 架构,Master 宕机后,消费者可切换到 Slave 消费;
- 消息高可用:
- 同步刷盘 + 同步复制,保证消息落地且有副本;
- 消息重试 + 死信队列,避免消息丢失;
- 故障自动切换:Broker 宕机后,NameServer 自动更新路由,生产者 / 消费者感知并切换。
七、核心流程记忆(极简版)
生产者流程
- 创生产者 → 设 NameServer → 启动;
- 构建消息(Topic + Tag + 内容);
- 选择发送模式(同步 / 异步 / 单向);
- 发送消息 → 处理结果 / 回调;
- 关闭生产者。
消费者流程
- 创消费者 → 设 NameServer → 订阅 Topic;
- 注册监听器 → 处理消息(成功 / 失败返回对应状态);
- 启动消费者 → 持续消费。
八、测试验证
- 启动 NameServer:
nohup sh mqnamesrv &; - 启动 Broker:
nohup sh mqbroker -n 127.0.0.1:9876 &; - 创建 Topic:
sh mqadmin updateTopic -n 127.0.0.1:9876 -t test_topic -b localhost:10911; - 运行生产者发送消息,消费者消费消息。
更多推荐




所有评论(0)