瓜农老梁

一个想分享点干货的家伙,微信公众号「瓜农老梁」

0%

Broker配置

首先看下broker.conf配置的两个属性

| 属性 |默认值 |

| — | — |

|traceTopicEnable |false |

| msgTraceTopicName | RMQ_SYS_TRACE_TOPIC |

在一个集群中可以配置一台机器专门负责消息轨迹的收集工作,该台机器上配置traceTopicEnable = true,

borker启动的时候自动创建默认轨迹topic

TopicConfigManager.java构造方法,BrokerController在启动的时候,会初始化TopicConfigManager实现trace topic的创建工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{

if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {

String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();

TopicConfig topicConfig = new TopicConfig(topic);

this.systemTopicList.add(topic);

topicConfig.setReadQueueNums(1);

topicConfig.setWriteQueueNums(1);

this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);

}

}
阅读全文 »

问题思考

从官方给的例子入手,代码如下:

示例类:org.apache.rocketmq.example.transaction.TransactionProducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public static void main(String[] args) throws MQClientException, InterruptedException {

//@1 定义TransactionListener

TransactionListener transactionListener = new TransactionListenerImpl();

//@2 使用事务发送Producer

TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");

//@3 定义线程池

ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

Thread thread = new Thread(r);

thread.setName("client-transaction-msg-check-thread");

return thread;

}

});

//设置线程池

producer.setExecutorService(executorService);

//设置监听器

producer.setTransactionListener(transactionListener);

producer.setNamesrvAddr("127.0.0.1:9876");

//@4 发送者启动

producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};

for (int i = 0; i < 10; i++) {

try {

Message msg =

new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,

("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

//@5 消息发送

SendResult sendResult = producer.sendMessageInTransaction(msg, null);

System.out.printf("%s%n", sendResult);

Thread.sleep(10);

} catch (MQClientException | UnsupportedEncodingException e) {

e.printStackTrace();

}

}

for (int i = 0; i < 100000; i++) {

Thread.sleep(1000);

}

//发送者关闭

producer.shutdown();

}
阅读全文 »

需求描述

在容器推广中,为了测试容器的性能,需要消息SDK与ECS上在发送和消费的性能对比;在对比消费性能时,发现容器中的消费性能居然是ECS的2倍。容器并发消费的20个线程TPS在3万左右,ECS中20个消费线程TPS在1.5万左右。

问题:配置均采用8C16G,容器中的性能几乎是ECS的两倍,这不科学,事出反常必有妖。

阅读全文 »

发送接口分类

* 按照发送方式分类

  1. 同步发送:等待返回结果

  2. 异步发送:异步回调发送结果

  3. 一次发送:无结果返回

* 按一次发送消息数量分类

  1. 单条消息发送

  2. 批量消息发送

* 按照是否指定MessageQueue分类

  1. 随机选择发送

  2. 指定特定MessageQueue

  3. 自定义MessageQueue选择器

阅读全文 »

消息发送代码

* 需要设置produerGroup

* 需要设置NameServer地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
DefaultMQProducer producer = new DefaultMQProducer("melon-tst");

producer.setNamesrvAddr("localhost:9876");

producer.setVipChannelEnabled(false);

producer.start();

for(int i=0;i<100;i++){

Message msg = new Message("topic_online_test",("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

//msg.setDelayTimeLevel(10);

SendResult sendResult = producer.send(msg);

System.out.printf("%s%n",sendResult);

}

producer.shutdown();
阅读全文 »

Topic创建的方式

Topic的创建分为自动创建和通过命令行创建两种。通过broker配置参数autoCreateTopicEnable设置。 通常可以在非生产环境开启自动创建,生产环境待审批后再进行创建。

* 命令行创建

1
sh bin/mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t threezto-test -r 12 -w 12
阅读全文 »

NameServer启动

* 从生产环境实践来看,NameServer启动使用默认配置即可,运行良好。

1
启动命令:nohup sh bin/mqnamesrv &

* NamesrvStartup.java 启动入口类,NameServer 启动默认端口9876

1
nettyServerConfig.setListenPort(9876)

* 每10秒钟扫描一次,移除失效的broker,同时移除缓存元数据

阅读全文 »

问题思考

消息消费时先从ConsumeQueue中获取物理偏移量,再根据物理偏移量从commitLog中获取具体消息;消息检索时会用到索引文件,其中值得思考的问题:

1.ConsumeQueue构建流程是怎样的?

2.ConsumeQueue数据结构是怎样的?

3.Index索引文件构建流程怎样的?

4.Index数据结构时怎么样的?

阅读全文 »