瓜农老梁

一个想分享点干货的家伙,微信公众号「瓜农老梁」

0%

MQ33# RocketMQ消费--Broker端处理逻辑

问题思考

1.Broker是如何处理消费流程的?

2.消费进度是如何流转的?

说明:本文分析均为PUSH消费模式

Broker处理消费流程

本部分将消费的切分成三块梳理:Broker消费处理流程概览、查找消息流程、以及消息查询结果处理流程。

Broker消费处理流程概览

1
小结:在拉取消息时会进行Broker和主题读权限的判断,实战中若有必要可以封锁Broker的拉取权限从而禁止从该broker进行消费;或者封锁某主题的读权限禁止消费组从该主题消费消息。

查找消息流程

小结:如果需要从磁盘拉取消息则一次默认最多拉取8条,一次消息的消息大小最大为64K。

如果从缓存中拉取默认最多32条,一次拉取的消息大小最大256K。使用tagcode会在查找消息前进行过滤,使用SQL92过滤再消息查找出来后进行过滤。

消息查询结果处理流程

1
小结:建议开启slaveReadEnable=true,当拉取的消息超过Broker内存40%时会从Slave节点消费,Master不必从磁盘重新读取数据;transferMsgByHeap默认为true即消息先拉取到堆空间再返回到客户端;如果设置为false则使用Netty#FileRegion,可用零字节拷贝不必再拷贝到堆内存提高性能。

消费进度流转

客户端上报消费进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
//@1 顺序消费/并发消费流程相同

//ConsumeMessageOrderlyService#processConsumeResult

//ConsumeMessageConcurrentlyService#processConsumeResult

if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {

//更新消费进度偏移量

this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);

}

@2 RemoteBrokerOffsetStore#updateOffset

AtomicLong offsetOld = this.offsetTable.get(mq);

MixAll.compareAndIncreaseOnly(offsetOld, offset);

@3 offsetTable存储结构:key为MessageQueue value为消费的偏移量进度

ConcurrentMap<MessageQueue, AtomicLong> offsetTable =

new ConcurrentHashMap<MessageQueue, AtomicLong>()

@4 定时同步消费进度

//持久化消息消费进度,默认5秒保存一次

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

MQClientInstance.this.persistAllConsumerOffset();

} catch (Exception e) {

log.error("ScheduledTask persistAllConsumerOffset exception", e);

}

}

}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

@5 RemoteBrokerOffsetStore#persistAll

for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet())

this.updateConsumeOffsetToBroker(mq, offset.get());
1
小结:PUSH消费中消费进度存储在offsetTable中,定时任务每5秒钟上报Broker一次。

Broker端处理消费进度

处理客户端定时上报消费进度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//@1 ConsumerManageProcessor#processRequest#updateConsumerOffset

this.brokerController.getConsumerOffsetManager().commitOffset

//@2 ConsumerOffsetManager#commitOffset

String key = topic + TOPIC_GROUP_SEPARATOR + group;

this.commitOffset(clientHost, key, queueId, offset);

Long storeOffset = map.put(queueId, offset);

//@3 消费进度缓存结构

//key=topic@group

//value=ConcurrentMap<Integer/* queueId*/, Long/*offset*/>>

offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

//@4 5秒钟一次存储消费进度

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

try {

BrokerController.this.consumerOffsetManager.persist();

} catch (Throwable e) {

log.error("schedule persist consumerOffset error.", e);

}

}

}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

//@5 consumerOffset.json文件格式

"zeus-package-mismatch-topic@autosort-packagelog":{0:9055300,1:9055157,2:9055304,3:9055232}
1
小结:Broker接到客户端消费进度上报后更新缓存offsetTable,每隔5秒中定时任务将offsetTable消费进度存储在磁盘文件consumerOffset.json中。

消息拉取后实时更新消费进度

1
2
3
4
5
6
7
8
9
10
11
//@1 PullMessageProcessor#processRequest

if (storeOffsetEnable) {

//更新消费进度

this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),

requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());

}
1
小结:PUSH消费客户端拉取消息后会实时更新消费的进度。

消费进度流转示意图