[toc]
1. 简介
RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点
2. 组成
他主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。
RocketMQ架构上主要分为四部分,如上图所示:
Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳. Consumer既可以从Master订阅消息,也可以从Slave订阅消息. Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。
主要包括两个功能:
Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。
Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
集群工作流程:
启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
如果是自动创建主题, 则会沿用默认主题
TBW102
的配置,它在哪,自动创建的主题就在哪Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
NameServer:
主要负责对于源数据的管理,包括了对于Topic和路由信息的管理。
NameServer压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。
但有一点需要注意,Broker向NameServer发心跳时, 会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话, 网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败。
NameServer 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。
每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。
Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息。
RocketMQ 提供了三种方式发送消息:同步、异步和单向
同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
Broker
消息中转角色,负责存储消息,转发消息。
Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。
Broker负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。
Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费。
Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费和广播消息,提供实时的消息订阅机制。
Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。(默认20s拉取一次)
Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。
push 方式其本质是加快版的pull(长轮询), 从结果来看就像是push一样
消息领域模型
Message
Message(消息)就是要传输的信息。
一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。
Topic
Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。
一个 Topic 也可以被 0个、1个、多个消费者订阅。
Tag
Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。
标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。
Group
分组,一个组可以订阅多个Topic。
分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的
Queue
在Kafka中叫Partition,每个Queue内部是有序的,在RocketMQ中分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。
读写队列,则是在做路由信息时使用。在消息发送时,使用写队列个数返回路由信息,而消息消费时按照读队列个数返回路由信息。在物理文件层面,只有写队列才会创建文件。举个例子:写队列个数是8,设置的读队列个数是4.这个时候,会创建8个文件夹,代表0 1 2 3 4 5 6 7,但在消息消费时,路由信息只返回4,在具体拉取消息时,就只会消费0 1 2 3这4个队列中的消息,4 5 6 7中的信息压根就不会被消费。反过来,如果写队列个数是4,读队列个数是8,在生产消息时只会往0 1 2 3中生产消息,消费消息时则会从0 1 2 3 4 5 6 7所有的队列中消费,当然 4 5 6 7中压根就没有消息 ,假设消费group有两个消费者,事实上只有第一个消费者在真正的消费消息(0 1 2 3),第二个消费者压根就消费不到消息。由此可见,只有readQueueNums>=writeQueueNums,程序才能正常进行
简单来说就是, 读队列表示消费的数量, 写队列表示生产的数量
rocketmq设置读写队列数的目的在于方便队列的缩容和扩容。思考一个问题,一个topic在每个broker上创建了128个队列,现在需要将队列缩容到64个,怎么做才能100%不会丢失消息,并且无需重启应用程序?
最佳实践:先缩容写队列128->64,写队列由0 1 2 ……127缩至 0 1 2 ……..63。等到64 65 66……127中的消息全部消费完后,再缩容读队列128->64.(同时缩容写队列和读队列可能会导致部分消息未被消费)
Message Queue
Message Queue(消息队列),主题被划分为一个或多个子主题,即消息队列。
一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。
消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。
topic下有消息队列, 消息队列又分为读/写队列, 应该是这样吧?????
Offset
在RocketMQ 中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset 来访问,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。
也可以认为 Message Queue 是一个长度无限的数组,Offset 就是下标。
消息消费模式
消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。
默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
而广播消费消息会发给消费者组中的每一个消费者进行消费。
Message Order
Message Order(消息顺序)有两种:Orderly(顺序消费)和Concurrently(并行消费)。
顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。
并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。
3. 生产者
3.1 三种发送消息方式
rocketmq支持三种发送消息的方式,分别是同步发送(sync),异步发送(async)和直接发送(oneway)
同步发送 sync 发送消息采用同步模式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。
这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(
DefaultMQProducer#getRetryTimesWhenSendFailed
)。 发送的结果存在同一个消息可能被多次发送给给broker,这里需要应用的开发者自己在消费端处理幂等性问题。异步发送 async 发送消息采用异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数sendCallback来告知发送者本次发送是成功或者失败。异步模式通常用于响应时间敏感业务场景,即承受不了同步发送消息时等待返回的耗时代价。
同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次(
DefaultMQProducer#getRetryTimesWhenSendAsyncFailed
)。发送的结果同样存在同一个消息可能被多次发送给给broker,需要应用的开发者自己在消费端处理幂等性问题。直接发送 one-way 采用one-way发送模式发送消息的时候,发送端发送完消息后会立即返回,不会等待来自broker的ack来告知本次消息发送是否完全完成发送。这种方式吞吐量很大,但是存在消息丢失的风险,所以其适用于不重要的消息发送,比如日志收集。
4. 消费者
5. broker
5.1 动态、增减 Broker
扩容
对集群进行扩容的时候,可以动态增加 Broker 角色的机器 。 只增加 Broker 不会对原有的 Topic 产生影响,原来创建好的 Topic 中数据的读写依然在原来的那些 Broker 上进行 。
集群扩容后,
一种是可以把新建的 Topic 指定到新的 Broker 机器上,均衡利用资源;
另一种方式是通过 updateTopic 命令更改现有的 Topic 配置,在新加的 Broker 上创建新的队列 。 比如 TestTopic 是现有的一个 Topic ,因为数据量增大需要扩容,新增的一个 Broker 机器地址是 192 . 168.0.1:10911 ,这个时候执行下面的命令: sh ./bin/mqadmin updateTopic -b 192.168.0.1:10911 -t TestTopic -n 192.168.0.100:9876
,结果是在新增的 Broker 机器上,为 TestTopic 新创建了 8个读写队列 。
缩容
当某个 Topic 有多个 Master Broker,停了其中一个,这时候是否会丢失消息呢?
答案和 Producer 使用的发送消息的方式有关,
如果使用同步方式 send ( msg )发送,在DefaultMQProducer 内部有个自动重试逻辑,其中一个 Broker 停了,会自动向另一个 Broker 发消息,不会发生丢消息现象。
如果使用异步方式发送 send ( msg, callback ),或者用 sendOneWay 方式,会丢失切换过程中的消息 。
因为在异步和 sendOneWay 这两种发送方式下,Producer.setRetryTimesWhensendFailed 设置不起作用,发送失败不会重试 。DefaultMQProducer 默认每 30 秒到 NameServer 请求最新的路由消息, Producer如果获取不到已停止的 Broker 下的队列信息,后续就自动不再向这些队列发送消息 。
5.2 消息存储
5.2.1 消息存储整体架构
消息存储架构图中主要有下面三个跟消息存储相关的文件构成。
(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,而不是存储生产者的消息本身,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset(8字节),消息大小size(4字节)和消息Tag的HashCode值(8字节)。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;
(3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME \store\index${fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
indexFile 这个文件用来加快消息查询的速度 。 按照Message Key查询消息就会用到这个文件
indexFile落盘并不是采取定时刷盘机制,而是每更新一次索引文件就会将上一次的改动刷写到磁盘 。 见文件
org.apache.rocketmq.store.index.IndexService#flush
在上面的RocketMQ的消息存储整体架构图中可以看出,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
5.2.2 页缓存与内存映射
页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。
在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。
另外,RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。
6. topic
一个topic可以分布在多broker上, 所以只有多个broker合起来才是一个完整的topic内容
5. RocketMQ 集群部署模式
0. 单 master 模式
也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用,适合个人学习使用。
- 多 master 模式 多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。 优点:所有模式中性能最高 缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。 注意:使用同步刷盘可以保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该 topic 的应用造成影响。
- 多 master 多 slave 异步复制模式 在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备模式。 优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。 缺点:使用异步复制的同步方式有可能会有消息丢失的问题。
- 多 master 多 slave 同步双写模式 同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。 优点:同步双写的同步模式能保证数据不丢失。 缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。 刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储) 同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步) 注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。
几个核心参数之间可按需组合 => master和slave的数量/ 刷盘策略/ master和salve之间的同步方式
A&Q
commitLog的消息是怎么同步到consumerQueue中的?
RocketMQ 通过开启 一个线程 ReputMessageServcie 来准实时转发 CommitLog 文件更新事件 , 相应 的任务处理器根据转发的消息及时更新 ConsumeQueue 、 IndexFile 文件 。broker启动时就会启动同步线程,
- broker启动后会取
math.max(所有队列中物理偏移量,commitLog低水位)
, 得应该从哪个位置开始追加队列消息 - 然后就会启动线程, 每1毫秒执行一次
- 从commitLog里取出消息来,通过调用 doDispatch方法 。 最终将分别调用 CommitLogDispatcherBuildConsumeQueue (构建消息消费队列 )、CommitLogDispatcherBuildlndex (构建索引文件)
- 消息消 费 队列转发任务实现类为 :
CommitLogDispatcherBuildConsumeQueue
,内部最终将调用putMessagePositioninfo
方法 , - 根据消息 主题与 队列 ID ,先获取对应的 ConumeQueue,
- 依 次将消息偏移量、消息长度、 tag的 hashcode 写入到 ByteBuffer 中,并根据 consumeQueueOffset 计算 ConumeQu eue 中的物理地址,将内 容追加到 ConsumeQueue 的内存映射文件中(本操作只追击并不刷盘 ), ConumeQueue 的刷盘方式固定为异步刷盘模式
总结, broker启动后就会启动监听线程,每1毫秒执行一次, 把消息从commitlog取出来,分别去构建consumerQueue和indexFile,
构建consumerQueue的消息写到MappedByteBuffer中(
MappedFile
类),只写内存不刷盘- broker启动后会取
broker 的高可用