瓜农老梁

一个想分享点干货的家伙,微信公众号「瓜农老梁」

0%

MQ28# RocketMQ存储--主从同步

问题思考

1.消息存储在Master上了,如何同步到Slave上了呢?

2.同步复制和异步复制流程是怎么样的?

Broker启动HA调用链

HA初始化调用链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@1 BrokerStartup#main

start(createBrokerController(args));

@2 BrokerStartup#createBrokerController

boolean initResult = controller.initialize();

@3 BrokerController#initialize

this.messageStore = new DefaultMessageStore

@4 DefaultMessageStore#DefaultMessageStore()

this.haService = new HAService(this);

this.defaultMessageStore = defaultMessageStore;

this.acceptSocketService =

new AcceptSocketService(defaultMessageStore.getMessageStoreConfig()

.getHaListenPort());

this.groupTransferService = new GroupTransferService();

this.haClient = new HAClient();

启动调用链

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@1 BrokerStartup#start

controller.start();

@2 BrokerController#start

this.messageStore.start();

@3 DefaultMessageStore#start

@4 this.haService.start();

this.acceptSocketService.beginAccept();

this.acceptSocketService.start();

this.groupTransferService.start();

this.haClient.start();
1
小结:从初始化和启动调用链中可以看到,在Broker启动时,初始化并启动了三个线程类,分别为AcceptSocketService、GroupTransferService、HAClient。

问题:这三个线程类在干啥?

线程类职责

AcceptSocketService职责

1
小结:AcceptSocketService职责初始化TCP通道,监听新的连接并创建HAConnection。

问题:HAConnection在做什么?

HAConnection职责

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//构造方法

public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {

this.haService = haService;

this.socketChannel = socketChannel;

//获取客户端请求地址

this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();

//将通道调整为非阻塞

this.socketChannel.configureBlocking(false);

//关闭连接前将数据发送完毕

this.socketChannel.socket().setSoLinger(false, -1);

//将Nagle算法关闭,客户端每发送一次数据无论大小,都会将其发送出去

this.socketChannel.socket().setTcpNoDelay(true);

//设置接受缓存区为64K

this.socketChannel.socket().setReceiveBufferSize(1024 * 64);

//设置发包缓存区为64K

this.socketChannel.socket().setSendBufferSize(1024 * 64);

//写数据线程类

this.writeSocketService = new WriteSocketService(this.socketChannel);

//读数据线程类

this.readSocketService = new ReadSocketService(this.socketChannel);

this.haService.getConnectionCount().incrementAndGet();

}

//启动

public void start() {

//启动读数据线程

this.readSocketService.start();

//启动写数据线程

this.writeSocketService.start();

}

疑问:HAConnection除了对通道做了一些设置外,启动了两个线程服务类,分别为readSocketService和writeSocketService,他们职责是什么呢?

writeSocketService职责

流程图

1
小结:writeSocketService主要职责,将数据不断写入socketChannel通道;写入数据的大小为nextTransferFromWhere与最大可读位置getReadPosition之间数据;每次写完传输指针自增this.nextTransferFromWhere += size;每隔5秒发送心跳包到socketChannel通道。

readSocketService职责

流程图

1
小结:readSocketService主要职责解析slave发来的请求位点,并更新push2SlaveMaxOffset为该请求位点;唤醒groupTransferService线程。

GroupTransferService职责

1
小结:GroupTransferService职责判断主从同步是否完成,完成后唤醒消息发送线程

HAClient职责

小结:HAClient职责Slave封装实现类,负责与Master建立连接通道,并从通道中获取数据存储;

并向Master上报Slave存储的最大物理偏移量。

主从同步示意图

主从同步交互消息格式

1.1 Slave上报物理偏移量reportOffset量格式

| 格式 | 说明 |

| — | — |

| 00000018516677754880 | 长度为8位的20位数字 |

1.2 Master写入Slave的信息由Header与Body构成

| 格式 | 说明 |

| — | — |

| 00000018516677754880+size | Header部分由8位物理偏移量+消息体大小构成

| 消息具体内容 | Slave请求的位点与Master可读位置之间的数据

主从同步示意图

源代码清单

* HAService.java

* HAService#AcceptSocketService

* HAService#GroupTransferService

* HAService#HAClient

* HAConnection.java

* HAConnection#ReadSocketService

* HAConnection#WriteSocketService