问题思考
1.主题队列是如何分配的?
2.什么时候会进行负载均衡?
3.负载均衡后是否会导致消息重复消费?
调用链条
初始化链条
1 | @1 DefaultMQPushConsumerImpl#start |
启动链条
1 | @1 DefaultMQPushConsumerImpl#start |
1 | 小结:从初始化链和调用链可以看出RebalanceService为线程类,随着消费启动时而启动,消费不退出则一直运行着。 |
负载均衡流程
负载均衡链条
1 | @1 RebalanceService#run |
负载均衡流程
1 | 小结:在负载均衡时,会循环该消费组订阅的所有Topic都会执行负载均衡。 |
更新缓存processQueue流程
1 | 小结: |
向Broker发送心跳流程
队列分配算法
负载均衡流程图中对clientId和分区队列的分配提交给分区算法执行,那该算法是如何运作的呢?
接口AllocateMessageQueueStrategy队列分配策略提供五种分配算法实现:
* 1.平均分配策略AllocateMessageQueueAveragely
* 2.环形分配策略AllocateMessageQueueAveragelyByCircle
* 3.机房分配策略AllocateMessageQueueByMachineRoom
* 4.一致性Hash分配策略AllocateMessageQueueConsistentHash
* 5.配置文件分配策略AllocateMessageQueueByConfig
除此之外可以自定义分配算法,实现接口接口即可,默认使用平均分配算法,也是最常用的,下面以该算法看看如何工作的。
1 | public List<MessageQueue> allocate |
代码不是很好阅读,看下面验证结果即可。
平均分配算法验证
* 只有一个clientId时分配情况
会把1个Broker的16个分区全部分配给该客户端,每隔20秒触发一次负载均衡。
currentCID=2.0.1.138@consumer01分到的队列为0~15
1 | ----------2019-08-04 22:10:15----------- |
* 新加入第二个client时
此时有两个clinetId分别为2.0.1.138@consumer01和2.0.1.138@consumer02,1个 Broker16个分区的分配情况。
currentCID=2.0.1.138@consumer01分到的分区为0~7
currentCID=2.0.1.138@consumer02分到的分区为8~16
1 | ----------2019-08-04 22:12:25----------- |
* 新加入第三个client时
此时有三个客户端2.0.1.138@consumer01、2.0.1.138@consumer02、2.0.1.138@consumer03,1个Broker的16个队列的分配情况。
currentCID=2.0.1.138@consumer01分到的队列0~5
currentCID=2.0.1.138@consumer02分到的队列6~10
currentCID=2.0.1.138@consumer03分到的队列11~15
1 | ----------2019-08-04 22:13:58----------- |
四、总结
1.主题队列是如何分配的?
备注:见队列分配算法,通常使用平均分配算法。
2.什么时候会进行负载均衡?
备注:负载均衡线程每隔20秒执行一次,当有新客户端退出或者加入或者新的Broker加入或掉线都会触发重新负载均衡。
3.负载均衡后是否会导致消息重复消费?
备注:
并发消费可能导致消息被重复消费,看以下代码。如果负载均衡前已分配的队列不在负载均衡后的新队列集合中,会丢弃该队列即:processQueue.isDropped()。而该队列可能已经被消费完了,在处理结果时被丢弃了,消费进度没有更新。别的消费客户端重新拉取该队列时造成重复消费。
顺序消费不会导致消息被重复消费。
1 | //并发消费对结果的处理 |