[TOC]

1. 整体架构

img

RocketMQ 主要由 Broker、NameServer、Producer 和 Consumer 组成的一个集群。

  • NameServer:整个集群的注册中心和配置中心,管理集群的元数据。包括 Topic 信息和路由信息、Producer 和 Consumer 的客户端注册信息、Broker 的注册信息。
  • Broker:负责接收消息的生产和消费请求,并进行消息的持久化和消息的读取。
  • Producer:负责生产消息。
  • Consumer:负责消费消息。
  1. broker有很多台,可以分散存储和被访问的压力, 且每个broker可以存在多个slave,当消费者从slave获取消息时,还能分担broker压力,进一步提高消费性能

  2. 生产和消费直连nameServer,可以快速定位到主题在哪个broker上,而且broker上还有这些数据(meta)的缓存, 甚至连nameServer的访问都省略了

  3. 消费集群可以增加数量, 以增加消费性能

1.1. 网络模型

RocketMQ 使用 Netty 框架实现高性能的网络传输。

Netty 的高性能传输的体现

  • 非阻塞 IO
  • Ractor 线程模型
  • 零拷贝。使用 FileChannel.transfer 避免在用户态和内核态之间的拷贝操作;通过 CompositeByteBuf 组合多个 ByteBuffer;通过 slice 获取 ByteBuffer 的切片;通过 wrapper 把普通 ByteBuffer 封装成 netty.ByteBuffer。

RocketMQ 网络模型

RocketMQ 的 Broker 端基于 Netty 实现了主从 Reactor 模型。架构如下:

img

具体流程:

  1. eventLoopGroupBoss 作为 acceptor 负责接收客户端的连接请求
  2. eventLoopGroupSelector 负责 NIO 的读写操作
  3. NettyServerHandler 读取 IO 数据,并对消息头进行解析
  4. disatch 过程根据注册的消息 code 和 processsor 把不同的事件分发给不同的线程。由 processTable 维护(类型为 HashMap)

1.2. 业务线程池隔离

RocketMQ 对 Broker 的线程池进行了精细的隔离。使得消息的生产、消费、客户端心跳、客户端注册等请求不会互相干扰。如下是各个业务执行线程池和 Broker 处理的报文类型的对应关系,从下图中我们也可以看出 Broker 的核心功能点。

img

2. 生产者

2.1 三种发送方式 可选

RocketMQ 支持三种消息发送方式:同步发送、异步发送和 One-Way 发送。One-Way 发送时客户端无法确定服务端消息是否投递成功,因此是不可靠的发送方式。异步发送也存在一定几率丢失,

客户端同步发送消息 时序图:

img

2.2. MQClientInstance 的单例模式统一管理维护网络通道,发送消息前只需要做一次服务状态可用性检查即可

2.3. 客户端故障容错机制

MQFaultStrategy 实现了基于 RT 耗时的容错策略。当某个 Broker 的 RT 过大时,认为该 Broker 存在问题,会禁用该 Broker 一段时间

这样免得再次访问该broker造成性能降低

3. broker写入

Broker 接收消息时序图

img

流程说明

  1. Broker 通过 Netty 接收 RequestCode 为 SEND_MESSAGE 的请求,并把该请求交给 SendMessageProcessor 进行处理。
  2. SendMessageProcessor 先解析出 SEND_MESSAGE 报文中的消息头信息(Topic、queueId、producerGroup 等),并调用存储层进行处理。
  3. putMessage 中判断当前是否满足写入条件:Broker 状态为 running;Broker 为 master 节点;磁盘状态可写(磁盘满则无法写入);Topic 长度未超限;消息属性长度未超限;pageCache 未处于繁忙状态(pageCachebusy 的依据是 putMessage 写入 mmap 的耗时,如果耗时超过 1s,说明由于缺页导致页加载慢,此时认定 pageCache 繁忙,拒绝写入)。
  4. 从 MappedFileQueue 中选择已经预热过的 MappedFile。
  5. AppendMessageCallback 中执行消息的操作 doAppend,直接对 mmap 后的文件的 bytbuffer 进行写入操作。

3.1 自旋锁减少上下文切换

RocketMQ 的 CommitLog 为了避免并发写入,使用一个 PutMessageLock。PutMessageLock 有 2 个实现版本:PutMessageReentrantLockPutMessageSpinLock

PutMessageReentrantLock 是基于 java 的同步等待唤醒机制;PutMessageSpinLock 使用 Java 的 CAS 原语,通过自旋设值实现上锁和解锁。RocketMQ 默认使用 PutMessageSpinLock 以提高高并发写入时候的上锁解锁效率,并减少线程上下文切换次数。

3.2 MappedFile 预热和零拷贝机制

MappedFile 预热

RocketMQ 消息写入对延时敏感,为了避免在写入消息时,CommitLog 文件尚未打开或者文件尚未加载到内存引起的 load 的开销,RocketMQ 实现了文件预热机制。项目启动时就开启了线程去预热, 预热代码关键节点如下:

graph TD
org.apache.rocketmq.broker.BrokerController#initialize -->
org.apache.rocketmq.common.ServiceThread#start -->
org.apache.rocketmq.store.MappedFile#warmMappedFile

Linux 系统在写数据时候不会直接把数据写到磁盘上,而是写到磁盘对应的 PageCache 中,并把该页标记为脏页。当脏页累计到一定程度或者一定时间后再把数据 flush 到磁盘(当然在此期间如果系统掉电,会导致脏页数据丢失)。

零拷贝

RocketMQ 选择了 mmap + write 这种零拷贝方式,适用于业务级消息这种小块文件的数据持久化和传输

而 Kafka 采用的是 sendfile 这种零拷贝方式,适用于系统日志消息这种高吞吐量的大块文件的数据持久化和传输

image-20220729145338914

Java NIO - 零拷贝实现 | Java 全栈知识体系 (pdai.tech)

3.3 同步和异步刷盘机制

RocketMQ 提供了同步刷盘和异步刷盘两种机制。默认使用异步刷盘机制。

当 CommitLog 在 putMessage() 中收到 MappedFile 成功追加消息到内存的结果后,便会调用 handleDiskFlush() 方法进行刷盘,将消息存储到文件中。org.apache.rocketmq.store.CommitLog#submitFlushRequest 便会根据两种刷盘策略,调用不同的刷盘服务。

抽象类 FlushCommitLogService 负责进行刷盘操作,该抽象类有 3 中实现:

  • GroupCommitService:同步刷盘

  • FlushRealTimeService:异步刷盘

  • CommitRealTimeService:异步刷盘并且开启 TransientStorePool

    每个实现类都是一个 ServiceThread 实现类。ServiceThread 可以看做是一个封装了基础功能的后台线程服务。有完整的生命周期管理,支持 start、shutdown、weakup、waitForRunning。

同步刷盘流程

  • 所有的 flush 操作都由 GroupCommitService 线程进行处理

  • 当前接收消息的线程封装一个 GroupCommitRequest,并提交给 GroupCommitService 线程,然后当前线程进入一个 CountDownLatch 的等待

  • 一旦有新任务进来 GroupCommitService 被立即唤醒,并调用 MappedFile.flush 进行刷盘。底层是调用 mappedByteBuffer.force ()

  • flush 完成后唤醒等待中的接收消息线程。从而完成同步刷盘流程

异步刷盘流程

  • RocketMQ 每隔 200ms 进行一次 flush 操作(把数据持久化到磁盘)
  • 当有新的消息写入时候会主动唤醒 flush 线程进行刷盘
  • 当前接收消息线程无须等待 flush 的结果。

3.4 顺序写磁盘

扩展阅读:

RocketMQ的broker处理消息commit时,加锁应该使用自旋锁还是重入锁以及sendMessageThreadPoolNums-CSDN博客

4. 消费者

消息存储结构

RocketMQ 的存储结构最大特点:

  • 所有的消息写入转为顺序写
  • 读写文件分离。通过 ReputMessageService 服务生成 ConsumeQueue

img

结构说明

  • ConsumeQueue 与 CommitLog 不同,采用定长存储结构,如下图所示。为了实现定长存储,ConsumeQueue 存储了消息 Tag 的 Hash Code,在进行 Broker 端消息过滤时,通过比较 Consumer 订阅 Tag 的 HashCode 和存储条目中的 Tag Hash Code 是否一致来决定是否消费消息。
  • ReputMessageService 持续地读取 CommitLog 文件并生成 ConsumeQueue。

4.1 顺序消费与并行消费

串行消费和并行消费最大的区别在于消费队列中消息的顺序性。顺序消费保证了同一个 Queue 中的消费时的顺序性。RocketMQ 的顺序性依赖于分区锁的实现。

并行消费

  1. 并行消费的实现类为 ConsumeMessageConcurrentlyService。

  2. PullMessageService 内置一个 scheduledExecutorService 线程池,主要负责处理 PullRequest 请求,从 Broker 端拉取最新的消息返回给客户端。拉取到的消息会放入 MessageQueue 对应的 ProcessQueue。

  3. ConsumeMessageConcurrentlyService 把收到的消息封装成一个 ConsumeRequest,投递给内置的 consumeExecutor 独立线程池进行消费。

  4. ConsumeRequest 调用 MessageListener.consumeMessage 执行用户定义的消费逻辑,返回消费状态。

  5. 如果消费状态为 SUCCESS。则删除 ProcessQueue 中的消息,并提交 offset。

  6. 如果消费状态为 RECONSUME。则把消息发送到延时队列进行重试,并对当前失败的消息进行延迟处理。

串行消费

  1. 串行消费的实现类为 ConsumeMessageOrderlyService。

  2. PullMessageService 内置一个 scheduledExecutorService 线程池,主要负责处理 PullRequest 请求,从 Broker 端拉取最新的消息返回给客户端。拉取到的消息会放入 MessageQueue 对应的 ProcessQueue。

  3. ConsumeMessageOrderlyService 把收到的消息封装成一个 ConsumeRequest,投递给内置的 consumeExecutor 独立线程池进行消费。

  4. 消费时首先获取 MessageQueue 对应的 objectLock,保证当前进程内只有一个线程在处理对应的的 MessageQueue, 从 ProcessQueue 的 msgTreeMap 中按 offset 从低到高的顺序取消息,从而保证了消息的顺序性。

  5. ConsumeRequest 调用 MessageListener.consumeMessage 执行用户定义的消费逻辑,返回消费状态。

  6. 如果消费状态为 SUCCESS。则删除 ProcessQueue 中的消息,并提交 offset。

  7. 如果消费状态为 SUSPEND。判断是否达到最大重试次数,如果达到最大重试次数,就把消息投递到死信队列,继续下一条消费;否则消息重试次数 + 1,在延时一段时间后继续重试。

    可见,串行消费如果某条消息一直无法消费成功会造成阻塞,严重时会引起消息堆积和关联业务异常。

4.2 push消息

RocketMQ支持pull和push操作, 它的push操作是加强版的pull的. 其本质就是一个 长轮询的pull操作, 具体流程如下:

  1. PullRequest 请求中有个参数 brokerSuspendMaxTimeMillis,默认值为 15s,控制请求 hold 的时长。

  2. PullMessageProcessor 接收到 Request 后,解析参数,校验 Topic 的 Meta 信息和消费者的订阅关系。对于符合要求的请求,从存储中拉取消息。

  3. 如果拉取消息的结果为 PULL_NOT_FOUND,表示当前 MessageQueue 没有最新消息。

  4. 此时会封装一个 PullRequest 对象,并投递给 PullRequestHoldService 内部线程的 pullRequestTable 中。

  5. PullRequestHoldService 线程会周期性轮询 pullRequestTable,如果有新的消息或者 hold 时间超时 polling time,就会封装 Response 请求发给客户端。 另外 DefaultMessageStore 中定义了 messageArrivingListener,当产生新的 ConsumeQueue 记录时候,会触发 messageArrivingListener 回调,立即给客户端返回最新的消息。

    长连接机制使得 RocketMQ 的网络利用率非常高效,并且最大限度地降低了消息拉取时的等待开销。实现了毫秒级的消息投递。

RocketMQ 高性能揭秘 - 知乎 (zhihu.com)