Broker配置
首先看下broker.conf配置的两个属性
| 属性 |默认值 |
| — | — |
|traceTopicEnable |false |
| msgTraceTopicName | RMQ_SYS_TRACE_TOPIC |
在一个集群中可以配置一台机器专门负责消息轨迹的收集工作,该台机器上配置traceTopicEnable = true,
borker启动的时候自动创建默认轨迹topic
TopicConfigManager.java构造方法,BrokerController在启动的时候,会初始化TopicConfigManager实现trace topic的创建工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| {
if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
|
客户端发送实现
客户端发送
DefaultMQProducer producer = new DefaultMQProducer(“ProducerGroupName”,true);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
//为消息轨迹注册hook,在消息发送前执行
this.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
|
SendMessageTraceHookImpl 实现了SendMessageHook接口,在消息发送前后会被调用
AsyncTraceDispatcher 主要负责消息的发送工作;内部队列,由线程池批量(100条)发送
Hook调用
发送前hook调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| //如果有hook在消息发送前执行,消息轨迹通过这种方式记录
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this); //发送对象
context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); //生产组
context.setCommunicationMode(communicationMode); //发送模式
context.setBornHost(this.defaultMQProducer.getClientIP()); //客户端IP
context.setBrokerAddr(brokerAddr); //发往broker的地址
context.setMessage(msg); //消息内容
context.setMq(mq); //消息 Queue
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context); //执行自定义个hook业务
}
|
发送后hook调用
1 2 3 4 5 6 7 8 9
| //消息发送后执行的hook,消息轨迹会调用
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
|
发送轨迹
Producer启动时注册钩子,该钩子持有负责消息发送的AsyncTraceDispatcher实例,消息发送后进而发送消息轨迹
发送轨迹的消息格式

客户端消费轨迹实现
消费轨迹:与消息发送的轨迹实现思路相同
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“CID_JODIE_1”,true);
注册消费钩子
ConsumeMessageTraceHookImpl实现了ConsumeMessageHook,在消费的前后会进行回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
//注册消费hook
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
|
ConsumeMessageConcurrentlyService.ConsumeRequest.run消费前执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| //消费前执行hook 消费轨迹会执行
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false); //消费状态
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
|
消费后执行
1 2 3 4 5 6 7 8 9 10
| if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
|
消费轨迹格式
分为两部分,一部分为消费前,一部分为消费后

