RockectMQ

  1. 消息发送机制
  • 验证消息
  • 查找路由
  • 消息发送
  1. producer 如何实现负载均衡

    通过TopicPublishInfo#selectOneMessageQueue和故障延迟机制对队列和 broker 进行轮循选择

  2. consumer 如何实现负载均衡

  • 消费端的负载均衡是通过RebalanceService.doRebalance 方法来实现的,有五种不同的算法:
    • 平均、轮循、一致性 hash、通过 config 配置、通过 broker 部署的机房
  1. 各个模块作用
  • NameServer: 通过多个 hashmap 存放了队列和 broker 的信息,通过与 broker 的心跳功能来实现路由的注册功能
  • Commitlog 用于顺序存储所有的topic 消息,为了加快消息消费,引入了 ConsumeQueue,为了加快消息的查找,引入了 IndexFile。
  • 通过 MappedFile 与内存进行组织映射,然后通过 flush 方法将消息持久化到磁盘中
  1. producer保证消息不会丢失

1.默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功

2.采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失

3.RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功

broke 保证消息不会丢失

1.消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的

2.Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中

3.Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失

consumer保证消息不会丢失

1.Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标

2.如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset

3.如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作

4.如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的消息到本地

第二章、NameServer ——RocketMQ 的路由中心

1、NameServer 存放的信息:

  • topicQueueTable: Topic 消息队列路由信息,消息发送时根据路由表进行负载均衡
  • brokerAddrTable: broker 基础信息,name、集群、地址
  • clusterAddrTable: broker 集群信息,存储集群中所有 Broker 名称
  • brokerLiveTable: broker 状态信息,NameServer 每次收到心跳包时会替换该信息
  • filterServerTable : Broker 上的 FilterServer 列表,用于类模式消息过滤

这几个 table 都是 Hashmap,里面存储是不同的 data对象。

2、路由注册

通过 Broker 与 NameServer 的心跳功能来实现的。

3、路由删除

NameServer 会 10s 扫描一次 brokerLiveTable 状态表,当 lastUpdateTimestamp 的时间戳已经 120s 灭有更新时,就认为 broker 失效,移除 broker 关闭连接。同时更新 topicQueueTable, brokerAddrTable, brokerLiveTable, filterServerTable。

4、路由发现

Topic 路由发生变化之后,NameServer 不主动推送给客户端,而是由客户端定时拉取主题最新的路由。

调用RouterInfoManager 的方法,从路由表 topicQueueTable、brokerAddrTable、filterServerTable 中分别填充 TopicRouteData 数据。

第三章 RocketMQ 的消息发送

分为同步、异步和单向发送三种方式

需要思考的问题:

消息队列如何进行负载?

消息发送如何实现高可用?120s 的下线机制怎么保证高可用?

批量消息发送如何实现一致性?

一、生产者启动流程

DefaultMQProducer 中有各种的发送和查找偏移量的基础方法。

从 DefaultMQProducerImpl 的 start 方法开始

1)检查 producerGroup 是否符合要求,并改变生产者的 instanceName 为进程 id

2)创建 MQClientInstance 实例

3)向MQClientInstance 注册,将当前生产者加入到 MQClientInstance 管理中

4)启动 MQClientInstance

二、消息发送的基本流程

验证消息,查找路由,消息发送

1、消息长度验证

2、查找主题路由信息

如果生产者中缓存了 topic 的路由信息,则直接返回

否则就去 NameServer 查询 topic 的路由信息

3、选择消息队列

1)默认机制 调用TopicPublishInfo#selectOneMessageQueue这个方法进行选择

2)Broker 故障延迟机制

4、消息发送

5、批量消息发送

第四章 RocketMQ 消息存储

一、存储概要设计

主要存储的文件包括 CommitLog文件、ConsumeQueue 文件、IndexFile 文件。

CommitLog 顺序保存所有主题的消息。

ConsumeQueue 是为了提高消息消费的效率。每个消息主题包含多个消息消费队列。

四、存储文件组织与内存映射

2、MappedFile 内存映射文件

1)MappedFile 初始化

2)MappedFile 提交

首先创建 writeBuffer 缓冲区

3)MappedFile 刷盘

将内存中的数据持久化到磁盘,通过 MappedFile.flush 方法来实现

4)获取 MappedFile 最大读指针

5)MappedFile 销毁

先关闭 MappedFile,然后调用 release 释放资源,判断清理完成之后关闭文件通道,删除物理文件

3、TransientStorePool:短暂的存储数据池

​ RocketMQ 单独创建一个 MappedByteBuffer 内存缓存池,用来存储临时数据。数据先写入该内存映射中,然后由 commit 线程定时将数据从该内存复制到与目的物理文件对应的内存映射中。

五、RocketMQ 存储文件

1)commitLog:消息存储目录

2)config:运行期间一些信息配置

1、Commitlog 文件

提供根据 offset 返回下一个文件起始偏移量的方法

2、ConsumeQueue 文件

通过存储了 commitlog 的偏移量来加快消息消费的检索,相当于 Commitlog 的索引文件。

当消息到达 CommitLog 的时候,由专门的线程产生消息转发任务,从而构建 ConsumeQueue 文件和 indexFile 文件。

可以通过时间戳和二分法来进行检索。

3、index 索引文件

Index 索引文件是基于 Hash 索引的。

通过 putKey 方法将消息索引键与消息偏移量映射关系写入到 IndexFile中。

然后提供了根据 key 查找消息的方法。

4、checkPoint 文件

用于记录 CommitLog、ConsumeQueue、IndexFile 的刷盘时间点。

六、实时更新ConsumeQueue 和 IndexFile

当消息生产者提交的消息存储在 Commitlog 文件中时,ConsumeQueue 和 IndexFile 需要及时更新,否则消息无法被及时消费。

RocketMQ 通过开启一个线程 ReputMessageService 来准时转发 Commitlog 文件更新事件。

第五章 RocketMQ 消息消费

一、消费概述

一个消费组内可以包含多个消费者,每一个消费组可以订阅多个主题,消费组之间有集群模式和广播模式。

  • 集群模式:主题下的同一条消息只允许被其中一个消费者消费
  • 广播模式:主题下的同一条消息将被集群内的所有消费者消费一次

消费者与服务器之间的传送模式:推模式、拉模式。

  • 拉模式:消费端主动发起拉消息请求
  • 推模式:消息到达服务器之后,推送给消费者

RocketMQ 基于拉模式。

三、消费者启动流程

  1. 构建主题订阅信息 SubscriptionData 并加入到 RebalanceImpl 的订阅消息中
  2. 初始化 MQClientInstance、RebalanceImpl(消息重新负载实现类)等
  3. 初始化消息进度,如果是集群模式,那么消息进度保存在 broker 上面,如果是广播模式,那么消息进度存储在消费端
  4. 根据是否是顺序消费,创建消费端消费线程服务。ConsumeMessageService 主要负责消息消费,内部维护一个线程池
  5. 向 MQClientInstance 注册消费者,并启动 MQClientInstance,在一个 JVM中的所有消费者生产者持有同一个 MQClientInstance,MQClientInstance 只会启动一次。

四、消息拉取

  • 广播模式:每个消费者需要去拉取订阅主题下所有消费队列的消息
  • 集群模式:如何进行消息队列负载??通常是一个消息队列在同一时间只允许被一个消费者消费。
    一个消费者可以同时消费多个队列。

1、PullMessageService 实现机制

  • 是一个服务线程,通过 Run 方法启动。
  • 从 PullRequestQueue 中获取一个 PullRequest 消息拉取任务,如果 pullRequestQueue 为空,则将线程阻塞,知道有拉取任务被放入。然后调用 pullMessage 方法进行消息拉取。
  • pullRequest 在两个地方被添加:一个是执行完一次消息拉取之后,又将 PullRequest 对象放入到 pullRequestQueue。第二个是在 RebalanceImpl 中创建的。

2、ProcessQueue 实现机制

ProcessQueue是 MessageQueue 在消费端的重现、快照。

PullMessageService 从消息服务器每次默认拉取 32 条消息,按照消息的队列偏移量顺序放在ProcessQueue中。然后 PullMessageService 将消息提交到消费者线程池,消费成功后从 ProcessQueue 中移除。

3、消息拉取的基本流程

主要分为三步:

1)客户端封装消息拉取请求

  • 从 PullRequest 中获取 ProcessQueue
  • 进行消息拉取流控,从消息消费数量和间隔两个维度进行控制
  • 拉取该主题订阅信息,如果为空,结束本次消息拉取。
  • 构建消息拉取系统标记
  • 调用 PullAPIWrapper.pullKernelImpl 方法后与服务端交互
  • 根据 brokerName、brokerId 从 MQClientInstance 中获取 Broker 地址

2)服务器查找并返回消息

  • 根据订阅信息,构建消息过滤器
  • 调用 MessageStore.getMessage 查找消息
  • 根据 topicName 与队列编号获取 ConsumeQueue

3)客户端处理返回的消息

4)消息拉取长轮询机制分析

如何进行消息负载?

五、消息队列负载与重新分布机制

  • RocketMQ 消息队列重新分布是由 RebalanceService 线程来实现的。
  • RebalanceService 线程默认 20s 执行一个 mqClientFactory.doRebalance 方法。遍历所有已注册的消费者,然后执行 doRebalance 方法。
  • 每个 DefaultMQPushConsumerImpl 都持有一个单独的 RebalanceImpl 对象,该方法主要是遍历订阅信息,对每个主题的队列都进行重新负载。

RebalanceImpl#rebalanceByTopic 可以针对单个主题进行消息队列重新负载

  1. 从主题订阅信息缓存表中获取主题的队列信息。发送请求从 Broker 中
  2. 对 cidAll,mqAll 排序,确保同一个消费队列不会被多个消费者分配

由于每次进行队列重新负载时会从 broker 实时查询出当前消费组内所有消费组,并且对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消费队列从而消费消息。

消费负载均衡算法:6 种策略

六、

3、消费进度管理

集群模式:同一个消费组内的所有消费组都共享主题下的所有消息,所以要保存在一个每个消费者都能访问到的地方