瓜农老梁

一个想分享点干货的家伙,微信公众号「瓜农老梁」

0%

问题思考

在消费消息时处处能看到处理队列ProcessQueue的身影,既然随处可见也一定很重要,那有必要分析下为何重要了。

1
2
3
4
5
1.ProcessQueue提供哪些方法?

2.这些方法的作用是什么?

3.哪里调用了这些方法?
阅读全文 »

消息拉取与处理

消息拉取

1
小结:PullMessageService处理拉取消息请求。通过组织RequetHeader需要包含从哪里开始拉取(ConsumerGroup、Topic,Queue,queueOffset)等信息,向Broker发起请求,取回消息后对消息进行处理。当该Queue的消息数量超过1000,或者最小与最大偏移量之间的差距超过默认2000也会触发限流,即:延迟50毫秒放入请求队列。也可以通过挂起消费线程来延迟(1秒)消息拉取,从而达到消费限流作用。
阅读全文 »

问题描述

日志目录(可配置)/data/rocketmq/store/commitlog会有20位长度的日志文件。

1.日志文件什么时候创建的?

2.日志文件创建流程是什么?

3.日志文件和内存映射是怎么样的?

1
2
3
4
5
6
7
8
9
10
11
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:50 00000117290188144640

-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:52 00000117291261886464

-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:54 00000117292335628288

-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:56 00000117293409370112

-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:57 00000117294483111936

-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:56 00000117295556853760
阅读全文 »

先梳理消息存储主干流程。本分切分为两部分,第一部分消息存储流程概览,主要为校验流程;第二部分CommitLog存储概览,即消息存储流程。

消息存储流程概览

调用链

1
2
3
4
5
6
7
@1 SendMessageProcessor#sendMessage

//消息存储

PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

@2 DefaultMessageStore#putMessage
阅读全文 »