瓜农老梁

一个想分享点干货的家伙,微信公众号「瓜农老梁」

0%

MQ42# RocketMQ消息轨迹

Broker配置

首先看下broker.conf配置的两个属性

| 属性 |默认值 |

| — | — |

|traceTopicEnable |false |

| msgTraceTopicName | RMQ_SYS_TRACE_TOPIC |

在一个集群中可以配置一台机器专门负责消息轨迹的收集工作,该台机器上配置traceTopicEnable = true,

borker启动的时候自动创建默认轨迹topic

TopicConfigManager.java构造方法,BrokerController在启动的时候,会初始化TopicConfigManager实现trace topic的创建工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{

if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {

String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();

TopicConfig topicConfig = new TopicConfig(topic);

this.systemTopicList.add(topic);

topicConfig.setReadQueueNums(1);

topicConfig.setWriteQueueNums(1);

this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);

}

}

客户端发送实现

客户端发送

DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”,true);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {

this.producerGroup = producerGroup;

defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);

//if client open the message trace feature

if (enableMsgTrace) {

try {

AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);

dispatcher.setHostProducer(this.getDefaultMQProducerImpl());

traceDispatcher = dispatcher;

//为消息轨迹注册hook,在消息发送前执行

this.getDefaultMQProducerImpl().registerSendMessageHook(

new SendMessageTraceHookImpl(traceDispatcher));

} catch (Throwable e) {

log.error("system mqtrace hook init failed ,maybe can't send msg trace data");

}

}

}

SendMessageTraceHookImpl 实现了SendMessageHook接口,在消息发送前后会被调用

AsyncTraceDispatcher 主要负责消息的发送工作;内部队列,由线程池批量(100条)发送

Hook调用

发送前hook调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//如果有hook在消息发送前执行,消息轨迹通过这种方式记录

if (this.hasSendMessageHook()) {

context = new SendMessageContext();

context.setProducer(this); //发送对象

context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); //生产组

context.setCommunicationMode(communicationMode); //发送模式

context.setBornHost(this.defaultMQProducer.getClientIP()); //客户端IP

context.setBrokerAddr(brokerAddr); //发往broker的地址

context.setMessage(msg); //消息内容

context.setMq(mq); //消息 Queue

String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);

if (isTrans != null && isTrans.equals("true")) {

context.setMsgType(MessageType.Trans_Msg_Half);

}

if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {

context.setMsgType(MessageType.Delay_Msg);

}

this.executeSendMessageHookBefore(context); //执行自定义个hook业务

}

发送后hook调用

1
2
3
4
5
6
7
8
9
//消息发送后执行的hook,消息轨迹会调用

if (this.hasSendMessageHook()) {

context.setSendResult(sendResult);

this.executeSendMessageHookAfter(context);

}

发送轨迹

Producer启动时注册钩子,该钩子持有负责消息发送的AsyncTraceDispatcher实例,消息发送后进而发送消息轨迹

发送轨迹的消息格式

客户端消费轨迹实现

消费轨迹:与消息发送的轨迹实现思路相同

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“CID_JODIE_1”,true);

注册消费钩子

ConsumeMessageTraceHookImpl实现了ConsumeMessageHook,在消费的前后会进行回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,

AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {

this.consumerGroup = consumerGroup;

this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;

defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);

if (enableMsgTrace) {

try {

AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);

dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());

traceDispatcher = dispatcher;

//注册消费hook

this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(

new ConsumeMessageTraceHookImpl(traceDispatcher));

} catch (Throwable e) {

log.error("system mqtrace hook init failed ,maybe can't send msg trace data");

}

}

}

ConsumeMessageConcurrentlyService.ConsumeRequest.run消费前执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//消费前执行hook 消费轨迹会执行

ConsumeMessageContext consumeMessageContext = null;

if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext = new ConsumeMessageContext();

consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());

consumeMessageContext.setProps(new HashMap<String, String>());

consumeMessageContext.setMq(messageQueue);

consumeMessageContext.setMsgList(msgs);

consumeMessageContext.setSuccess(false); //消费状态

ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);

}

消费后执行

1
2
3
4
5
6
7
8
9
10
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {

consumeMessageContext.setStatus(status.toString());

consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);

ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);

}

消费轨迹格式

分为两部分,一部分为消费前,一部分为消费后