SHAN


  • Home

  • Archives

【RocketMQ】Broker服务注册

Posted on 2021-03-20

Broker服务注册

BrokerOuterAPI

服务注册

Broker服务注册的详细代码在BrokerOuterAPI的registerBrokerAll方法中:

1.构建Request请求头和请求体,并设置相关的参数;

2.因为Broker会向所有的NameServer注册,所以构建CountDownLatch,用来等待向所有的NameServer注册的任务都执行完毕;

3.使用线程池执行向每一个NameServer注册的任务,底层是通过Netty Client向NameServer发送注册请求的;

4.将注册结果放入结果集中,等待所有NameServer执行完毕返回结果列表;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 注册的详细代码
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {
// 创建一个列表保存注册结果
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
// 获取NameServer地址列表
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 构建Request请求头
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
// 设置broker地址
requestHeader.setBrokerAddr(brokerAddr);
// 设置broker id
requestHeader.setBrokerId(brokerId);
// 设置broker name
requestHeader.setBrokerName(brokerName);
// 设置集群名称
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
// 创建请求体
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
// 构建了一个CountDownLatch
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
// 使用线程池执行注册任务
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// 向NameServer注册
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
// 添加注册结果
registerBrokerResultList.add(result);
}

log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}

try {
// 等待Broker对所有的NameServer注册完毕
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}

return registerBrokerResultList;
}

服务注册请求的发送

进入到registerBroker看一下服务注册请求发送的详细代码,可以看到底层是使用Netty Client进行请求发送的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
// 创建RemotingCommand,封装请求头和请求体
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
// 设置请求体
request.setBody(body);
// 先不管
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
// 使用Netty Client发送网络请求
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null;
// 处理请求响应结果
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}

throw new MQBrokerException(response.getCode(), response.getRemark());
}
NettyRemotingClient

进入NettyRemotingClient中的invokeSync方法,看下详细的发送过程:

1.获取一个Channel,也就是获取和NameServer的一个连接,在获取Channel的时候首先会从缓存中获取Channel,如果没有获取到才会创建新的Channel;

2.调用invokeSyncImpl方法,通过Channel向NameServer发送请求;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  @Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
// 获取当前时间
long beginStartTime = System.currentTimeMillis();
// 获取Channel,也就是获取了一个和NameServer的一个连接
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
// 真正发送请求的地方
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}

// 获取Channel
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {
if (null == addr) {
// 获取Channel
return getAndCreateNameserverChannel();
}
// 从缓存中获取channel
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
// 创建channel
return this.createChannel(addr);
}

创建Channel的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 创建Channel
private Channel createChannel(final String addr) throws InterruptedException {
// 先从缓存中获取连接
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
// 加锁
if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
// 同样是从缓存中获取连接
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {

if (cw.isOK()) {
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}

if (createNewConnection) {
// 创建连接的地方
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
// 将连接放入缓存
this.channelTables.put(addr, cw);
}
} catch (Exception e) {
log.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
} else {
log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
// 先不管
if (cw != null) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
}
} else {
log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
}

return null;
}

发送网络请求的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 发送网络请求的代码
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();

try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}

responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});

RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}

return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}

心跳机制

为了让NameServer感知到Broker是否有异常,Broker会定时发送心跳到NameServer,告诉NameServer自己的服务正常。

BrokerController

回到start方法中,可以看到注册了一个定时任务,定时会向NameServer进行注册,默认每30s进行一次注册:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void start() throws Exception {
...
// 定时向NameServer注册
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
// 向NameServer注册
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
...
}

NameServer注册处理

Broker的注册请求处理

NamesrvController

回到NamesrvController的initialize方法中,有一个注册处理器的方法registerProcessor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean initialize() {
...
// 注册请求处理器
this.registerProcessor();
...
}
private void registerProcessor() {
// 是否是集群测试
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
// 注册默认的处理器,创建了一个DefaultRequestProcessor
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}

DefaultRequestProcessor

DefaultRequestProcessor的processRequest对请求进行处理,会判断请求类型,然后处理对应的请求,找到Broker的注册类型,可以看到调用registerBroker方法对Broker的请求进行处理,最终是调用RouteInfoManager的registerBroker进行注册的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class DefaultRequestProcessor{
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {

if (ctx != null) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
// 判断请求类型,处理不同的请求
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER: // 如果是Broker的注册请求
Version brokerVersion = MQVersion.value2Version(request.getVersion());
// 版本判断
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
// Broker注册
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINFO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:
return this.getConfig(ctx, request);
default:
break;
}
return null;
}
// Broker的注册
public RemotingCommand registerBroker(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
// 构造请求响应
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
// 获取请求头
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);

if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}

TopicConfigSerializeWrapper topicConfigWrapper;
if (request.getBody() != null) {
topicConfigWrapper = TopicConfigSerializeWrapper.decode(request.getBody(), TopicConfigSerializeWrapper.class);
} else {
topicConfigWrapper = new TopicConfigSerializeWrapper();
topicConfigWrapper.getDataVersion().setCounter(new AtomicLong(0));
topicConfigWrapper.getDataVersion().setTimestamp(0);
}
// 这里调用RouteInfoManager的registerBroker对Broker进行注册,可以看到从请求中获取了Broker的信息作为参数传入
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
// 设置响应头
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());

byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
// 返回响应
return response;
}

}

RouteInfoManager

RouteInfoManager的registerBroker对Broker进行了注册:

1.根据集群名称获取对应的broker集合,然后将当前的broker加入集合中;

2.根据Broker的name获取brokerAddrsMap,其中key为broker的id,value为IP+端口,然后处理重复数据,保证brokerAddrTable核心路由表同一个IP和端口只有一条记录;

3.判断是否是Master,如果是并且Topic发生了变化或者是Broker首次注册,将会创建或者更新当前broker的Topic队列信息;

4.记录当前Broker的LiveInfo,Broker每次定时发送请求作为心跳的时候,都会有一个新的BrokerLiveInfo记录Broker的注册信息,包括注册时间、连接channel等信息,后续可以用来感知Broker是否有异常;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
// 构建注册结果
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
// 加锁
this.lock.writeLock().lockInterruptibly();
// 根据集群名称获取对应的broker集合,这里用set集合,当Broker发送心跳机制的时候集合不会有重复数据
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
// 向集合中添加当前的broker
brokerNames.add(brokerName);

boolean registerFirst = false;
// 根据Broker name获取BrokerData
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
// 首次注册的时候为空
if (null == brokerData) {
registerFirst = true;
// 如果获取为空,构造BrokerData加入brokerAddrTable中
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
// 根据broker name获取所有broker的地址,key为brokerId,value为IP:PROT
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
// 这里处理重复数据,保证同一个IP和端口只在brokerAddrTable有一条记录
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
// 如果IP和端口一致,但是brokerID不一致,移除旧的记录
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
// 加入当前的broker信息
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
// 判断是否是首次注册
registerFirst = registerFirst || (null == oldAddr);
// 如果topicConfigWrapper不为空,并且当前的broker是master
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
// 如果Topic有变化或者是首次注册
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
// 获取当前broker的所有的Topic
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
// 遍历topic
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
// 创建或者更新当前broker的Topic队列信息
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 构建BrokerLiveInfo,记录Broker的LiveInfo,当Broker定时发送请求作为心跳的时候,都会有一个新的BrokerLiveInfo记录Broker的注册信息,包括注册时间、连接channel等信息
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}

if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
// 如果不是master
if (MixAll.MASTER_ID != brokerId) {
// 获取master信息
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
// 获取master的LiveInfo
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}

return result;
}

异常感知

在NamesrvController的初始化方法中启动了一个定时任务,调用了routeInfoManager的scanNotActiveBroker方法扫描不活跃的Broker,默认10s扫描一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class NamesrvController {
public boolean initialize() {
...
// 定时任务线程池,启动了routeInfoManager的扫描不活跃的Broker方法
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
...
}

RouteInfoManager

RouteInfoManager的scanNotActiveBroker会扫描超时未发送心跳的Broker,默认是120s,如果超过120s没有收到心跳注册,就会移除Broker的路由信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 扫描不活跃的Broker
public void scanNotActiveBroker() {
// 遍历所有的BrokerLiveInfo
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
// 获取最后一次注册的时间戳
long last = next.getValue().getLastUpdateTimestamp();
// 判断是否超过设定的时间,默认是120s
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
// 将移除Broker的路由信息
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}

注:图片来自于儒猿技术窝-从 0 开始带你成为消息中间件实战高手

参考

儒猿技术窝:从 0 开始带你成为消息中间件实战高手

【RocketMQ】NameServer的启动

Posted on 2021-03-15

NameServer

NameServer是一个注册中心,支持Broker的注册与发现,Broker在启动的时候会向NameServer注册(服务注册),Producer和Consumer会通过NameServer拉取信息,获取Topic的路由信息(服务发现)。

NamesrvStartup

NameServer的启动类是NamesrvStartup。

1.NameServer在启动的时候会创建一个NamesrvController,它是NameServer的核心组件;

在创建NamesrvController的方法中,主要是对一些配置信息的处理,比如在启动的时候命令行中指定了配置文件,那么就会读取这个配置文件的参数,如果没有指定,就使用默认的配置来初始化NameServer(NamesrvConfig)和Netty(NettyServerConfig)网络配置的相关参数。

2.初始化NamesrvController,注册JVM服务器关闭时的钩子函数,以便在关闭的时候做一些处理,最后通过创建的NamesrvController启动NameServer;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public class NamesrvStartup {

private static InternalLogger log;
private static Properties properties = null;
private static CommandLine commandLine = null;

public static void main(String[] args) {
main0(args);
}

// 启动入口
public static NamesrvController main0(String[] args) {

try {
// 创建NamesrvController
NamesrvController controller = createNamesrvController(args);
// 启动
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
// 启动nameserver的时候使用命令启动的,会带一下参数,这里处理命令行相关参数
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 创建NamesrvConfig,主要是一些nameserver的相关配置参数
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// 创建Netty服务相关配置,主要是Netty的相关配置参数
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 设置Netty默认端口为9876
nettyServerConfig.setListenPort(9876);
// 如果启动的时候命令行参数中带了-c选项,也就是启动的时候指定了配置文件
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
// 读取指定的配置文件,从文件中解析参数设置到namesrvConfig和nettyServerConfig中
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);

namesrvConfig.setConfigStorePath(file);

System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 如果带了-p参数,就会打印NameServer的所有配置信息
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
// 将启动命令时所带的参数覆盖到namesrvConfig中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 如果ROCKET_HOME为空,输出异常日志
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 日志相关的设置
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 打印日志
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);

return controller;
}
// 启动
public static NamesrvController start(final NamesrvController controller) throws Exception {

if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化controller
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册关闭时的钩子函数,在JVM关闭的时候做一些处理
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 启动NameServer
controller.start();

return controller;
}

NamesrvController

初始化

NmaeServer作为一个注册中心,是需要接收Broker的注册,以及Producer和Consumer的数据拉取,NameServer必然要启动一个服务器接收这些请求,所以初始化代码中,主要是初始化Netty服务器,和一些定时任务线程池。

进入controller.initialize()方法,看一下初始化的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class NamesrvController {
public boolean initialize() {
// 加载一些kv配置数据
this.kvConfigManager.load();
// 创建NettyRemotingServer,初始化Netty服务器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 设置netty服务器的工作线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册处理器,先不管
this.registerProcessor();
// 定时任务线程池,启动了routeInfoManager的扫描不活跃的Broker方法
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 定时任务,打印KV配置信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 先不管
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}

return true;
}

}

启动

初始化Controller之后就可以启动NameServer了,其实主要就是启动Netty服务器:

1
2
3
4
5
6
7
8
9
// 启动NameServer
public void start() throws Exception {
// 启动Netty服务器
this.remotingServer.start();

if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}

NettyRemotingServer

进入NettyRemotingServer的start的方法,这里主要是Netty相关的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});

prepareSharableHandlers();

ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});

if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}

try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}

if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}

this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
}

总结

NameServer的启动主要就是初始化相关配置参数,构建一个基于Netty的网络服务器,启动Netty服务器,接收Broker和Producer、Consumer的网络请求,进行网络通信。

注:图片来自于儒猿技术窝-从 0 开始带你成为消息中间件实战高手

参考

儒猿技术窝:从 0 开始带你成为消息中间件实战高手

【Spring Cloud】Ribbon调用过程

Posted on 2021-02-13

负载均衡的实现方式

@RibbonClient

@LoadBalanced

@LoadBalanced实现负载均衡的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
public class ServiceribbonApplication {

public static void main(String[] args) {
SpringApplication.run(ServiceribbonApplication.class, args);
}

@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
}

restTemplate调用:

1
2
3
4
5
6
7
8
9
10
11
@Service
public class HelloService {

@Autowired
RestTemplate restTemplate;


public String helloService(String name){
return restTemplate.getForObject("http://SERVICE-HI/hello?name="+name,String.class);
}
}

@LoadBalanced注解的原理可参考:

胖波:Ribbon中@LoadBalanced注解的原理

Robbon负载均衡源码分析

RestTemplate

进入restTemplate的getForObject方法,进行一系列的调用,最终会进入到doExecute方法中,将请求信息封装为ClientHttpRequest对象,调用它的execute方法执行请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
@Override

@Nullable
public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {
RequestCallback requestCallback = this.acceptHeaderRequestCallback(responseType);
HttpMessageConverterExtractor<T> responseExtractor = new HttpMessageConverterExtractor(responseType, this.getMessageConverters(), this.logger);
// 调用了execute方法
return this.execute(url, HttpMethod.GET, requestCallback, responseExtractor, (Object[])uriVariables);
}


@Nullable
public <T> T execute(String url, HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {
URI expanded = this.getUriTemplateHandler().expand(url, uriVariables);
// 调用doExecute方法
return this.doExecute(expanded, method, requestCallback, responseExtractor);
}

@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {

Assert.notNull(url, "URI is required");
Assert.notNull(method, "HttpMethod is required");
ClientHttpResponse response = null;
try {
// 封装请求
ClientHttpRequest request = createRequest(url, method);
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
// 调用execute方法执行请求
response = request.execute();
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
catch (IOException ex) {
String resource = url.toString();
String query = url.getRawQuery();
resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
throw new ResourceAccessException("I/O error on " + method.name() +
" request for \"" + resource + "\": " + ex.getMessage(), ex);
}
finally {
if (response != null) {
response.close();
}
}
}

}

HttpAccessor

调用createRequest方法是在HttpAccessor类中实现的,在该方法中调用了getRequestFactory获取RequestFactory,由于RestTemplate继承了InterceptingHttpAccessor,InterceptingHttpAccessor又继承了HttpAccessor,所以执行getRequestFactory的时候会进入到InterceptingHttpAccessor的getRequestFactory创建请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class HttpAccessor { 
// 默认是SimpleClientHttpRequestFactory
private ClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();

// 使用工厂模式创建ClientHttpRequest
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
if (logger.isDebugEnabled()) {
logger.debug("HTTP " + method.name() + " " + url);
}
return request;
}
}

InterceptingHttpAccessor

在InterceptingHttpAccessor中的getRequestFactory中,会判断拦截器是否为空,如果不为空判断当前RequestFactory是否为空,如果为空将创建一个InterceptingClientHttpRequestFactory类型的工厂:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class InterceptingHttpAccessor extends HttpAccessor {
@Override
public ClientHttpRequestFactory getRequestFactory() {
// 获取拦截器
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
// 如果拦截器不为空
if (!CollectionUtils.isEmpty(interceptors)) {
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
// 如果factory为空
if (factory == null) {
// 创建InterceptingClientHttpRequestFactory
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
}
return factory;
}
else {
return super.getRequestFactory();
}
}
}

InterceptingClientHttpRequestFactory

InterceptingClientHttpRequestFactory当然创建的是InterceptingClientHttpRequest类型的请求类,所以最终会进入到InterceptingClientHttpRequest的execute方法中:

1
2
3
4
5
6
7
8
9
10
public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {

private final List<ClientHttpRequestInterceptor> interceptors;

@Override
protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}

}

InterceptingClientHttpRequest

InterceptingClientHttpRequest继承了AbstractClientHttpRequest,它们都是ClientHttpRequest的子类,InterceptingClientHttpRequest中有一个内部类InterceptingRequestExecution,它有一个拦截器类型的ClientHttpRequestInterceptor迭代器iterator,还实现了execute方法,在execute方法中会获取拦截器,对请求进行拦截:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {

private class InterceptingRequestExecution implements ClientHttpRequestExecution {

private final Iterator<ClientHttpRequestInterceptor> iterator;

public InterceptingRequestExecution() {
this.iterator = interceptors.iterator();
}

@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
// 获取拦截器
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
}

ClientHttpRequestInterceptor的实现类如下:

LoadBalancerInterceptor

进入到LoadBalancerInterceptor拦截器中,查看拦截器的intercept方法,在该方法中调用了LoadBalancerClient的execute方法执行请求,也就是说请求会被负载均衡拦截器所拦截,然后调用负载均衡客户端 LoadBalancerClient的execute方法执行请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

// 负载均衡客户端
private LoadBalancerClient loadBalancer;

private LoadBalancerRequestFactory requestFactory;

public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
LoadBalancerRequestFactory requestFactory) {
this.loadBalancer = loadBalancer;
this.requestFactory = requestFactory;
}

public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
// for backwards compatibility
this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
}

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
// 获取服务名称
String serviceName = originalUri.getHost();
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
// 调用负载均衡客户端的execute方法执行请求
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}
}

RibbonLoadBalancerClient

LoadBalancerClient是一个接口,RibbonLoadBalancerClient实现了它,在execute方法中,首先调用getLoadBalancer方法获取LoadBalancer的实现类,选取一个负载均衡实现类,默认情况下会返回ZoneAwareLoadBalancer的负载均衡器,然后调用getServer(在此方法中又调用了chooseServer),根据负载均衡规则选取一个服务,之后就知道往哪个服务上发送请求,执行发送请求操作了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public class RibbonLoadBalancerClient implements LoadBalancerClient {


@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
throws IOException {
return execute(serviceId, request, null);
}

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
// 获取LoadBalancer实现类
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
// 根据负载均衡规则选取一个服务
Server server = getServer(loadBalancer, hint);
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
// 调用execute方法执行请求
return execute(serviceId, ribbonServer, request);
}


protected ILoadBalancer getLoadBalancer(String serviceId) {
// 获取LoadBalancer实现类
return this.clientFactory.getLoadBalancer(serviceId);
}

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// 调用chooseServer选择一个服务
return loadBalancer.chooseServer(hint != null ? hint : "default");
}

// 执行发送请求的方法
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance,
LoadBalancerRequest<T> request) throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}

RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId);
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

try {
// 执行请求
T returnVal = request.apply(serviceInstance);
statsRecorder.recordStats(returnVal);
return returnVal;
}
// catch IOException and rethrow so RestTemplate behaves correctly
catch (IOException ex) {
statsRecorder.recordStats(ex);
throw ex;
}
catch (Exception ex) {
statsRecorder.recordStats(ex);
ReflectionUtils.rethrowRuntimeException(ex);
}
return null;
}

}

负载均衡策略

ILoadBalancer

ILoadBalancer是一个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ILoadBalancer {
void addServers(List<Server> var1);

Server chooseServer(Object var1);

void markServerDown(Server var1);

/** @deprecated */
@Deprecated
List<Server> getServerList(boolean var1);

List<Server> getReachableServers();

List<Server> getAllServers();
}

BaseLoadBalancer中实现了chooseServer方法,在BaseLoadBalancer中有一个默认的负载均衡策略是RoundRobinRule:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {

private final static IRule DEFAULT_RULE = new RoundRobinRule();
// 默认策略RoundRobinRule
protected IRule rule = DEFAULT_RULE;
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
// 选取服务
return rule.choose(key);
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
}

七种负载均衡策略

  1. RoundRobinRule
  2. RandomRule
  3. AvailabilityFilteringRule
  4. WeightedResponseTimeRule
  5. RetryRule
  6. BestAvailableRule
  7. ZoneAvoidanceRule

参考:

【掘金小册】LinkedBear:SpringCloudNetflix 源码解读与原理分析

SpringBoot版本:2.1.9

SpringCloud版本: Greenwich.SR4

SpringBoot自动装配原理

Posted on 2021-01-10

SpringBootApplication

springboot项目一般都会有一个主程序入口,使用@SpringBootApplication注解标注:

1
2
3
4
5
6
7
@SpringBootApplication
public class SpringbootdemoApplication implements CommandLineRunner{

public static void main(String[] args) {
SpringApplication.run(SpringbootdemoApplication.class, args);
}
}

点进@SpringBootApplication,可以看到又引入了其他的注解,主要关注以下几个注解:

  • @SpringBootConfiguration
  • @ComponentScan
  • @EnableAutoConfiguration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(
excludeFilters = {@Filter(
type = FilterType.CUSTOM,
classes = {TypeExcludeFilter.class}
), @Filter(
type = FilterType.CUSTOM,
classes = {AutoConfigurationExcludeFilter.class}
)}
)
public @interface SpringBootApplication {
@AliasFor(
annotation = EnableAutoConfiguration.class
)
Class<?>[] exclude() default {};

@AliasFor(
annotation = EnableAutoConfiguration.class
)
String[] excludeName() default {};

@AliasFor(
annotation = ComponentScan.class,
attribute = "basePackages"
)
String[] scanBasePackages() default {};

@AliasFor(
annotation = ComponentScan.class,
attribute = "basePackageClasses"
)
Class<?>[] scanBasePackageClasses() default {};

@AliasFor(
annotation = ComponentScan.class,
attribute = "nameGenerator"
)
Class<? extends BeanNameGenerator> nameGenerator() default BeanNameGenerator.class;

@AliasFor(
annotation = Configuration.class
)
boolean proxyBeanMethods() default true;
}

@SpringBootConfiguration

@SpringBootConfiguration比较简单,它只导入了一个@Configuration注解,所以可以理解为@SpringBootConfiguration就是一个配置类。

1
2
3
4
5
6
7
8
9
10
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Configuration
public @interface SpringBootConfiguration {
@AliasFor(
annotation = Configuration.class
)
boolean proxyBeanMethods() default true;
}

@ComponentScan

使用过Spring的应该都很熟悉了,这个注解用来配置扫描包里的文件,将符合条件的bean注册到Spring容器中。
excludeFilters:配置不进行扫描的过滤器。

1
2
3
4
5
6
7
8
@ComponentScan(
excludeFilters = {@Filter(
type = FilterType.CUSTOM,
classes = {TypeExcludeFilter.class}
), @Filter(
type = FilterType.CUSTOM,
classes = {AutoConfigurationExcludeFilter.class}
)}

@EnableAutoConfiguration

@EnableAutoConfiguration是SpringBoot自动装配的核心,它由@AutoConfigurationPackage和@Import组成,@Import导入了AutoConfigurationImportSelector类:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import({AutoConfigurationImportSelector.class})
public @interface EnableAutoConfiguration {
String ENABLED_OVERRIDE_PROPERTY = "spring.boot.enableautoconfiguration";

Class<?>[] exclude() default {};

String[] excludeName() default {};
}

@AutoConfigurationPackages

1
2
3
4
5
6
7
8
9
10
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import({Registrar.class})
public @interface AutoConfigurationPackage {
String[] basePackages() default {};

Class<?>[] basePackageClasses() default {};
}

AutoConfigurationPackages表示使用该注解的类所在的包会被加入到自动扫描package中,也就是SpringBoot会将主程序类所在的包及其下面的所有子包下的组件扫描到Spring容器中,主要是通过导入Registrar类实现的:

1
2
3
4
5
6
7
8
9
10
11
12
static class Registrar implements ImportBeanDefinitionRegistrar, DeterminableImports {
Registrar() {
}

public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
AutoConfigurationPackages.register(registry, (String[])(new AutoConfigurationPackages.PackageImports(metadata)).getPackageNames().toArray(new String[0]));
}

public Set<Object> determineImports(AnnotationMetadata metadata) {
return Collections.singleton(new AutoConfigurationPackages.PackageImports(metadata));
}
}

AutoConfigurationImportSelector

在AutoConfigurationImportSelector的selectImports方法中,调用了getAutoConfigurationEntry方法,在getAutoConfigurationEntry又调用了getCandidateConfigurations,获取所有候选的配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class AutoConfigurationImportSelector implements DeferredImportSelector, BeanClassLoaderAware, ResourceLoaderAware, BeanFactoryAware, EnvironmentAware, Ordered {
...

public String[] selectImports(AnnotationMetadata annotationMetadata) {
if(!this.isEnabled(annotationMetadata)) {
return NO_IMPORTS;
} else {
// 调用getAutoConfigurationEntry方法
AutoConfigurationImportSelector.AutoConfigurationEntry autoConfigurationEntry = this.getAutoConfigurationEntry(annotationMetadata);
return StringUtils.toStringArray(autoConfigurationEntry.getConfigurations());
}
}

protected AutoConfigurationImportSelector.AutoConfigurationEntry getAutoConfigurationEntry(AnnotationMetadata annotationMetadata) {
if(!this.isEnabled(annotationMetadata)) {
return EMPTY_ENTRY;
} else {
AnnotationAttributes attributes = this.getAttributes(annotationMetadata);
// 调用getCandidateConfigurations,获取所有候选的配置
List<String> configurations = this.getCandidateConfigurations(annotationMetadata, attributes);
configurations = this.removeDuplicates(configurations);
Set<String> exclusions = this.getExclusions(annotationMetadata, attributes);
this.checkExcludedClasses(configurations, exclusions);
configurations.removeAll(exclusions);
configurations = this.getConfigurationClassFilter().filter(configurations);
this.fireAutoConfigurationImportEvents(configurations, exclusions);
return new AutoConfigurationImportSelector.AutoConfigurationEntry(configurations, exclusions);
}
}

// 获取所有候选的配置
protected List<String> getCandidateConfigurations(AnnotationMetadata metadata, AnnotationAttributes attributes) {
// SpringFactoriesLoader.loadFactoryNames可以获取指定类的自动配置类
List<String> configurations = SpringFactoriesLoader.loadFactoryNames(this.getSpringFactoriesLoaderFactoryClass(), this.getBeanClassLoader());
Assert.notEmpty(configurations, "No auto configuration classes found in META-INF/spring.factories. If you are using a custom packaging, make sure that file is correct.");
return configurations;
}

...

在getCandidateConfigurations中,有一步是调用loadFactoryNames,并且传入了两个参数,其中有一个是通过getSpringFactoriesLoaderFactoryClass返回的EnableAutoConfiguration类,另外一个传入的是类的加载器:

1
2
3
4
5
6
7
8
9
// 返回EnableAutoConfiguration类
protected Class<?> getSpringFactoriesLoaderFactoryClass() {
return EnableAutoConfiguration.class;
}

// 返回类的加载器
protected ClassLoader getBeanClassLoader() {
return this.beanClassLoader;
}

再进入到SpringFactoriesLoader的loadFactoryNames方法,主要的处理在loadSpringFactories方法中,在这个方法中,先通过给定的类的加载器从缓存中获取对应的自动配置类,如果不为空直接返回接口,如果为空,根据指定的类加载器加载META-INF下的spring.factories文件(使用了SPI机制),然后将内容解析放到Map中,并加入到缓存中,之后就可以根据类的加载器名称获取对应自动配置类,加载到Spring容器中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public final class SpringFactoriesLoader {

......

public static List<String> loadFactoryNames(Class<?> factoryType, @Nullable ClassLoader classLoader) {
ClassLoader classLoaderToUse = classLoader;
if(classLoader == null) {
classLoaderToUse = SpringFactoriesLoader.class.getClassLoader();
}

String factoryTypeName = factoryType.getName();
return (List)loadSpringFactories(classLoaderToUse).getOrDefault(factoryTypeName, Collections.emptyList());
}

private static Map<String, List<String>> loadSpringFactories(ClassLoader classLoader) {
// 根据当前的类加载器,从缓存中获取数据
Map<String, List<String>> result = (Map)cache.get(classLoader);
if(result != null) {
return result;
} else {
HashMap result = new HashMap();

try {
// 通过给定的类加载器从META-INF下的spring.factories文件
Enumeration urls = classLoader.getResources("META-INF/spring.factories");
// 解析spring.factories文件的内容
while(urls.hasMoreElements()) {
URL url = (URL)urls.nextElement();
UrlResource resource = new UrlResource(url);
Properties properties = PropertiesLoaderUtils.loadProperties(resource);
Iterator var6 = properties.entrySet().iterator();

while(var6.hasNext()) {
Entry<?, ?> entry = (Entry)var6.next();

String factoryTypeName = ((String)entry.getKey()).trim();
String[] factoryImplementationNames = StringUtils.commaDelimitedListToStringArray((String)entry.getValue());
String[] var10 = factoryImplementationNames;
int var11 = factoryImplementationNames.length;

for(int var12 = 0; var12 < var11; ++var12) {
// 将spring.factories文件的内容解析放入到result中
String factoryImplementationName = var10[var12];
((List)result.computeIfAbsent(factoryTypeName, (key) -> {
return new ArrayList();
})).add(factoryImplementationName.trim());
}
}
}

result.replaceAll((factoryType, implementations) -> {
return (List)implementations.stream().distinct().collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
});
// 将result的内容加入到缓存中
cache.put(classLoader, result);
return result;
} catch (IOException var14) {
throw new IllegalArgumentException("Unable to load factories from location [META-INF/spring.factories]", var14);
}
}
}
}

选取了spring-boot-autoconfiguer下的spring.factories文件内容,key为接口名称,value为对应的自动装配类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# Application Listeners
org.springframework.context.ApplicationListener=\
org.springframework.boot.autoconfigure.BackgroundPreinitializer

# Auto Configuration Import Listeners
org.springframework.boot.autoconfigure.AutoConfigurationImportListener=\
org.springframework.boot.autoconfigure.condition.ConditionEvaluationReportAutoConfigurationImportListener

# Auto Configuration Import Filters
org.springframework.boot.autoconfigure.AutoConfigurationImportFilter=\
org.springframework.boot.autoconfigure.condition.OnBeanCondition,\
org.springframework.boot.autoconfigure.condition.OnClassCondition,\
org.springframework.boot.autoconfigure.condition.OnWebApplicationCondition

# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.boot.autoconfigure.admin.SpringApplicationAdminJmxAutoConfiguration,\
org.springframework.boot.autoconfigure.aop.AopAutoConfiguration,\
org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration,\
org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration,\
org.springframework.boot.autoconfigure.cache.CacheAutoConfiguration,\
org.springframework.boot.autoconfigure.cassandra.CassandraAutoConfiguration,\
org.springframework.boot.autoconfigure.context.ConfigurationPropertiesAutoConfiguration,\
org.springframework.boot.autoconfigure.context.LifecycleAutoConfiguration,\
org.springframework.boot.autoconfigure.context.MessageSourceAutoConfiguration,\
org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration,\
org.springframework.boot.autoconfigure.couchbase.CouchbaseAutoConfiguration,\
org.springframework.boot.autoconfigure.dao.PersistenceExceptionTranslationAutoConfiguration,\
org.springframework.boot.autoconfigure.data.cassandra.CassandraDataAutoConfiguration,\
org.springframework.boot.autoconfigure.data.cassandra.CassandraReactiveDataAutoConfiguration,\
org.springframework.boot.autoconfigure.data.cassandra.CassandraReactiveRepositoriesAutoConfiguration,\
org.springframework.boot.autoconfigure.data.cassandra.CassandraRepositoriesAutoConfiguration,\
org.springframework.boot.autoconfigure.data.couchbase.CouchbaseDataAutoConfiguration,\
org.springframework.boot.autoconfigure.data.couchbase.CouchbaseReactiveDataAutoConfiguration,\
org.springframework.boot.autoconfigure.data.couchbase.CouchbaseReactiveRepositoriesAutoConfiguration,\
org.springframework.boot.autoconfigure.data.couchbase.CouchbaseRepositoriesAutoConfiguration,\
org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration,\
org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchRepositoriesAutoConfiguration,\
org.springframework.boot.autoconfigure.data.elasticsearch.ReactiveElasticsearchRepositoriesAutoConfiguration,\
org.springframework.boot.autoconfigure.data.elasticsearch.ReactiveElasticsearchRestClientAutoConfiguration,\
org.springframework.boot.autoconfigure.data.jdbc.JdbcRepositoriesAutoConfiguration,\
org.springframework.boot.autoconfigure.data.jpa.JpaRepositoriesAutoConfiguration,\
org.springframework.boot.autoconfigure.data.ldap.LdapRepositoriesAutoConfiguration,\
org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration,\

回到getCandidateConfigurations方法中,调用loadFactoryNames方法时传入的类为EnableAutoConfiguration类,所以loadFactoryNames最终可以根据传入的类加载器,从缓存中获取EnableAutoConfiguration在spring.factories中对应的自动配置类(如果缓存中没有获取到,就去加载spring.factories文件并解析后放入缓存),之后将自动配置类导入Spring容器,自动配置类生效,完成自动装配。

参考:
掘金小册-SpringBoot 源码解读与原理分析
SpringBoot自动装配原理解析
SpringBoot自动装配原理初探
SpringBoot自动配置的原理详解
SpringBoot 2.X课程学习 | 第三篇:自动配置(Auto-configuration)

SpringBoot版本:2.4.1

【MySQL】sort buffer和join buffer

Posted on 2020-05-16

sort buffer和join buffer两者没什么关系,只是最近重读了极客时间的MySQL实战,又加深了对MySQL的认知,这里记录一下sort buffer和join buffer两个知识点,以便加深印象。

sort buffer

sort buffer从名字上就可以看出肯定和排序有关,MySQL会为每个线程分配一块内存,用于排序,这个内存就称为sort buffer。

内存的大小可以通过sort_buffer_size参数控制,如果需要排序的数据量超过了sort_buffer_size的大小,就需要借助磁盘临时文件进行外部排序,一般使用归并排序算法,所以排序的数据可能需要被切分,每一份单独排序后存在临时文件中,最后将所有的文件合并成一个有序的大文件。

可以通过打开optimizer_trace,查看optimizer_trace结果中的number_of_tmp_files,判断是否使用了临时文件以及临时文件的个数。

全字段排序

假设有一个表t有name、age、city字段,在city上创建了索引,现在执行:

1
select city,name,age from t where city = '杭州' order by name limit 1000;

语句的执行流程:

(1)初始化sort_buffer,放入city、name、age三个字段;

(2)通过city上的索引,找到第一个city是杭州的节点,通过节点中保存的主键id去主键索引寻找该条记录,取出name、city和age字段的值,放入到sort_buffer中;

(3)继续寻找下一个满足条件的主键,通过id在主键索引中取出三个字段的值放入sort_buffer;

(4)重复步骤2和3,直到找到不满足条件的city为止;

(5)对sort_buffer中的数据按照name字段做排序;

(6)按照排序结果取前1000行返回给客户端;

rowid排序

在全字段排序中,假如select中取的字段太多,sort_buffer可能放不下,就需要借助外部排序,排序的性能会下降,max_length_for_sort_data参数可以修改排序的单行最大长度,在全字段排序中取了city、name和age字段,三个字段的数据类型长度加起来就是排序的单行长度,假设全排序中单行长度超过了max_length_for_sort_data,MySQL为了提高性能,会采用rowid排序。

rowid的排序流程:

(1)初始化sort_buffer,放入name(因为需要根据name进行排序)和id;

(2)从city索引中找到第一个city为杭州的数据的id,到主键索引中找到对应的记录,取name和id两个字段的值,放入sort_buffer;

(3)继续寻找下一个满足条件的值,将id和name放入sort_buffer;

(4)重复步骤2和3,知道找到不满足的条件的city为止;

(5)对sort_buffer中的数据按照name排序;

(6)根据排序的结果取前1000行,因为sort_buffer中只放了name和id,没有age和city,所以此时需要根据id再次从主键所以中回表取出city 、name和age的值,最后返回给客户端;

注:图片来源于极客时间-林晓斌(丁奇):MySQL实战

可以发现与字段全排序相比,rowid比字段全排序多了一次最后根据id从主键索引回表的过程。

总结:

1.如果sort_buffer足够大,会优先选择字段全排序,把需要的字段全放入sort_buffer中,排序后可直接返回结果;

2.如果sort_buffer无法满足排序的数量,会采用rowid排序算法,但是需要根据id回表查询所需的字段;

优化:

如果根据索引取到的数据就是有序的,那么就可以省去排序的过程了,所以可以在表上建一个city和name的联合索引,此时的查询流程如下:

注:图片来源于极客时间-林晓斌(丁奇):MySQL实战

(1)从联合索引(city,name)中找到第一个满足city是杭州的数据的主键id;

(2)到主键索引中根据主键id取name、city、age三个字段的值,返回结果到客户端;

(3)继续取下一个记录的id;

(4)重复步骤2和3,直到第1000条记录或者不满足条件的时候为止;

可以看到如果加了city和name的联合索引,可以省去排序的过程,再继续优化,借助覆盖索引,创建city、name和age的联合索引,这样可以把回表的步骤也省掉了:

(1)从联合索引(city,name,age)中找到第一个满足city是杭州的节点,因为创建的是联合索引,索引节点中包含city、name、age的值,直接将结果集返回给客户端即可;

(2)从联合索引取下一个满足条件的记录,返回数据给客户端;

(3)重复执行步骤2,直到第1000条记录或者不满足条件为止;

join buffer

Index Nested-Loop Join(NLJ)

假设有两张表t1和t2,表结构一致,除了主键id之外只有两个字段a和b,在a字段上创建索引,然后执行语句:

1
select * from t1 straight_join t2 on (t1.a=t2.a);

执行流程:

(1)从表t1读入一行数据R;

(2)从数据行R中读取a字段的值,到表t2中查找,由于t2表中a字段创建了索引,所以可以借助索引在表t2中查找;

(3)将t2表中查找到满足条件的所有行,与t1做关联;

(4)重复执行步骤1到3,遍历表t1,直到所有数据被读完;

在这个过程中,t1属于驱动表,t2属于被驱动表,驱动表t1做了全表扫描,每次取出一行,然后取出字段a的值再从t2中借助索引进行查找,完成表数据行的关联,由于在被驱动表中查找时使用到了索引,所以这种查询流程称为Index Nested-Loop Join。

Simple Nested-Loop Join

修改一下查询语句,假如执行下面这个查询:

1
select * from t1 straight_join t2 on (t1.a=t2.b);

表t2的字段b是没有索引的,所以此时的查询流程如下:

(1)从表t1读入一行数据R;

(2)从数据行R中读取a字段的值,到表t2中查找,由于t2表中b字段没有索引,所以只能对t2进行全表扫描,查找满足条件的值;

(3)将t2表中查找到满足条件的所有行,与t1做关联;

(4)重复执行步骤1到3,遍历表t1,直到所有数据被读完;

假如驱动表的行数为N,被驱动表的行数为M,如果被驱动表上没有可用的索引,那么将会扫描NM行,可以看到在N=100,M=1000时,NM = 100000,只是这么小的数据量下就要扫描10万行,使用此算法代价会比较大,所以有了BNL算法Block Nested-Loop Join。

Block Nested-Loop Join(BNL)

在被驱动表上没有可用的索引时,BNL算法的执行流程:

(1)将表t1的数据放入线程内存join_buffer中,由于语句是select * 所以t1的所有数据会被加载到join_buffer中;

(2)扫描表t2,取出表t2的每一行,与join_buffer中的数据做对比,如果满足条件,将数据关联,作为结果集的一部分返回;

注:图片来源于极客时间-林晓斌(丁奇):MySQL实战

假设表t1的行数为N,t2的行数为M:

  • 两张表都做了一次全表扫描,扫描行数是M+N;
  • 表t1的每一行都要和表t2的每一行作比较,判断次数为MN,虽然还是MN但是是在内存中做的判断,因此性能会提示很多;

假如表t1的数据量太大,join_buffer中放不下,那么将会采用分段放的方式:

(1)将表t1的数据放入join_buffer中,假设在第50行的时候join_buffer满了,进行第2步;

(2)扫描表t2,取出表t2的每一行,与join_buffer中的数据做对比,如果满足条件,将数据关联,作为结果集的一部分返回;

(3)清空join_buffer;

(4)继续扫描t1,将剩余的行放入join_buffer,重复步骤1、2、3;

总结:

(1)尽量使用小表做驱动表;

(2)使用BNL算法在大表上的join操作可能要扫描被驱动表多次,占用大量的IO资源,在判断join条件时可能需要大量的判断占用CPU资源,尽量避免这种join;

(3)会导致Buffer Pool的热数据被淘汰,影响内存命中率;

Multi-Range Read 优化

我们知道,MySQL InnoDB的二级索引叶子节点中存储的是主键的值,如果需要查询其他字段的值,需要根据主键的值去主键索引上查找,才能拿到其他字段的值,这个操作称为回表,假设有一表t,在字段a上创建了索引,执行如下语句:

1
select * from t1 where a>=1 and a<=100;

执行流程:

(1)在字段a上建立的二级索引中找到第一个a值大于1的节点,找到对应的主键;

(2)根据主键值去主键索引中查找,获取该行记录所有字段的值,作为返回结果集;

(3)循环步骤1和2,直到把小于等于100的值全部寻找完毕;

在这个过程中,由于每次只根据一个主键ID去主键索引中查找,每次查找就变成了随机访问,性能相对较差,因为大部分数据都是按照主键递增顺序插入得到的,假如按照主键递增顺序查找的话,对磁盘的读比较接近顺序读,这样就可以提升读性能,这就是MRR优化的思路。

MRR优化后的执行流程:

(1)根据a字段上的二级索引找到所有满足条件的记录,将所有的主键ID值放入read_rnd_buffer中;

(2)将read_rnd_buffer中的id进行递增排序;

(3)根据read_rnd_buffer中排好序的id依次到主键索引中查询记录,并作为返回结果;

MRR的核心思想在于得到足够多的主键ID,然后对ID排序,再去主键索引查找数据,将随机访问改为尽量接近顺序访问。

Batched Key Access优化

MySQL在5.6版本之后引入了Batched Key Access(BKA) 算法,它可以对 Index Nested-Loop Join(NLJ) 进行优化。

NLJ算法的流程是从驱动表t1,一行一行的取出a值,再到被驱动表t2中根据字段a的索引去查找,对于表t2每次只匹配一个值,为了让t2表可以一次性多拿到一些值,可以把t1表的数据取出来之后先放到临时内存中,然后批量的去t2表的索引上去查询,我们知道在NLJ算法中并没有用到join_buffer,所以join_buffer就可以充当临时内存。

BKA算法的核心思想就是借助join_buffer批量的从被驱动表的索引中查询数据。

参考

极客时间 — 林晓斌(丁奇):MySQL实战

【MySQL】MVVC机制

Posted on 2020-05-03

MySQL隔离级别

读未提交(Read Uncommitted):某个事务读到了其他还未提交的事务对数据所作的修改,也就是某个事务只要修改了数据,其他事务就可以看到所作的修改。

这种隔离级别下会发生脏读、不可重复读、幻读。

读提交(Read Committed):某个事务提交之后,才可以被其他事务看到。

这种隔离级别下会发生不可重复读、幻读。

可重复读(Repeatable Read):一个事务在执行过程中看到的数据,总是跟这个事务在启动的时候看到的数据是一致的。

MySQL的默认隔离级别,这种隔离级别下会发生幻读。

不可重复读侧重于数据的修改,幻读侧重于数据的新增。假设事务A开始之前某个字段的值是A,事务B将它的值修改为了B,此时事务A再次查询得到的结果是B,与事务A开始时查到的值A不一致,就是不可重复读。假设事务A开始之前查询某个表只有一条记录,事务B往表中又插入了一条记录,此时事务A再次查询莫名多出一条记录,就是幻读。

串行化(Serializable):对于同一条记录,读会加读锁,写会加写锁,读和读之间不会冲突,但是读和写之间会冲突,假如读写冲突,后访问的事务需要等待前一个事务执行完毕才可以继续执行。

因为不允许事务并发执行,只能串行执行,因此不会有脏读、不可重复读、幻读问题,但是由于串行执行,性能比较差。

Read View一致性视图

InnoDB中,每个事务在开始的时候都会申请一个事物id,这样每个事务会有一个唯一的事务id,每行数据也会有多个版本,每次事务更新的时候,会生成对应的事务版本,将事物id设置到这个事务版本的trx_id中,旧的事务版本也会保留:

注意:图中的版本链不是物理上真实存在的,实际上是根据undo log版本链计算出来的。

undo log版本链

由于对数据的修改会生成undo log回滚日志,每个事务又会生成对应的事务版本,那么多个事务修改的时候就会形成一个undo log 版本链,每条数据包含两个隐藏字段,trx_id和roll_pointer:

trx_id:最近一次更新这条数据的事务id

roll_pointer:指向当前事务之前生成的那个undo log

所以多个事务串行执行的时候,每个事务都会生成一条undo log,通过roll_pointer将undo log串起来,形成undo log版本链,当没有事务需要用到这些undo log时,undo log才会被删除,假如有长事务,由于随时可能访问数据库的任何数据,在这个事务提交之前,它可能用到的undo log都将会保留,就会导致大量的占用存储空间,因此需要尽量避免长事务。

一致性视图

事务在启动的时候,InnoDb会为每一个事务创建一个数组,保存这个事务启动时,当前所有正在活跃(事务已启动但是还没提交)的事务ID,数组中事务ID最小值为低水位,已经生成过的事务的最大值的ID加1记作高水位,视图数组和高水位组成了当前事务的一致性视图,基于事务的trx_id和一致性视图可以实现读提交和不可重复读这两个隔离级别:

注:图片来自于极客时间 — 林晓斌(丁奇):MySQL实战

对于当前事务来说,一个事务的id可能有以下三种情况:

1.在视图数组中已提交的事务中,表示这个版本是已提交的事务,那么它是可以被当前事务读到的;

2.在未开始事务中,表示这个版本的数据是由将来启动的事务生成的,对当前事务来说必然是不可见的;

3.在未提交的事务集合中,有两种情况:

(1)如果trx_id在当前事务的活跃数组中,表示这个版本是由未提交的事务生成的,对当前事务来说不可见;

(2)如果trx_id不在当前事务的活跃数组中,表示这个版本的事务已提交,对当前事务来说可见;

基于ReadView实现读提交

假设系统当前存在两个活跃的事务B和事务C:

(1)事务B第一次查询id值时会顺着undo log版本链寻找,首先会读取undo log版本链最新的值,undo log版本链中当前值为30,对应的trx_id 为30,trx_id比事务B中max_trx_id小,说明事务B在生成read view之前就存在这个活跃的事务,由于trx_id为30的事务在事务B的活跃列表中
在读提交隔离级别下,是不能读取到30这个值的,继续顺着版本链往前找,下一个是trx_id为10更新的值,trx_id =10 比事务B中的min_trx_id小,说明事务B生成read view之前,trx_id为10的事务就已经提交,所以可以读取trx_id=10的事务更新的值,事务B第一次查询得到ID的值为10;

(2)事务B在第二次查询时,会重新生成read view,此时由于事务C已提交,trx_id为30的事务不在事务B的活跃事务列表中,此时事务B是可以读取到30的,所以事务B第二次读取到ID的值为30;

实现读提交比较重要的一点就是每次在查询时都会重新生成read view。

基于ReadView实现可重复读

假设系统当前存在两个活跃的事务B和事务C:

(1)与读提交中的第一次查询一致,得到的值为10;

(2)事务B在第二次查询时,会继续使用第一次查询生成的一致性视图,此时虽然事务C已提交,但是由于没有重新生成一致性视图,trx_id为30的事务依旧在事务B的活跃事务列表中,此时事务B是不可以读取到30的,所以事务B第二次读取到ID的值为10;

实现不可重复读比较重要的一点就是整个事务都会使用一个一致性视图,不是每次查询都重新生成一致性视图。

总结:

在读提交隔离级别下,每个语句执行前都会重新算出一个一致性视图。

在可重复读隔离级别下,只在事务开始的时候创建一致性视图,之后事务里的其他查询都共用这个一致性视图。

当前读

当前读指的是读取undo log版本链中最新的记录

1
2
select xx from xx lock in share;
select xx from xx for update;

lock in share mode会加读锁,for update会加写锁,这两种语句都会进行当前读。

快照读

指的是按照生成的一致性视图读取数据。

需要注意的时,如果是更新操作都是当前读。

创建持续整个事务的一个一致性快照:

1
start transaction with consistent snapshot;

在读提交隔离级别下,没有意义,等效于普通的start transaction。

参考:

极客时间 — 林晓斌(丁奇):MySQL实战

救火队队长:从零开始带你成为MySQL实战优化高

【Spring Cloud】Eureka缓存机制

Posted on 2020-03-15

Eureka分为Client端和Server端,Client端向Server端注册自己的服务信息,并且拉取所有服务的注册信息,Server端作为注册中心,负责接收Client端的注册信息,维护所有服务的注册信息,Server端也可以开启集群模式,相互之间同步服务的注册信息。

与缓存相关的三个变量

1.registry
1
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry

AbstractInstanceRegistry的成员变量,相当于服务端的注册表,维护所有服务的注册信息,第一层Map的key是服务的应用名称,value也是一个map,其中key是实例id,value是实例的详细信息(Lease)。

2.readWriteCacheMap
1
private final LoadingCache<Key, ResponseCacheImpl.Value> readWriteCacheMap;

ResponseCacheImpl的成员变量,使用的是guava的LoadingCache构建的缓存,其中key为com.netflix.eureka.registry包下的Key对象,value是ResponseCacheImpl的内部类Value对象,从名称上可以看出它是一个读写缓存。

3.readOnlyCacheMap
1
private final ConcurrentMap<Key, ResponseCacheImpl.Value> readOnlyCacheMap = new ConcurrentHashMap();

ResponseCacheImpl的成员变量,同样是一个ConcurrentMap,从名称上可以看出它是一个只读的缓存对象。

所以Eureka Server使用了一个读写分离的两级缓存机制,registry负责维护所有服务的注册信息,当register数据有变化,将会更新readWriteCacheMap的内容,后台开启定时任务,默认30s从readWriteCacheMap同步数据到readOnlyCacheMap,Eureka Client拉取服务的注册信息时,从readOnlyCacheMap读取数据,并没有直接从readWriteCacheMap中获取数据:

Client从Server端拉取数据流程:

(1)Client端DiscoveryClient的构造函数中,会初始化定时任务;

(2)缓存刷新任务,定时发送请求到Server端,拉取服务的注册数据,Server端收到请求,首先会根据请求信息构建key,根据key从缓存中获取数据;

(3)Server端根据key获取缓存数据时,会判断是否开启了使用readOnly缓存:

如果开启:先从readOnlyCacheMap获取,如果从readOnlyCacheMap未获取到数据,再从readWriteCacheMap中查找;

如果未开启:直接从readWriteCacheMap中获取;

(4)从readWriteCacheMap获取数据也有两种结果:

第一:获取到数据,如果开启了readOnly缓存,会将结果设置到readOnlyCacheMap中,否则直接返回结果即可;

第二:未获取到数据,由于readWriteCacheMap使用的是guava构建的缓存,从readWriteCacheMap根据key获取数据,假如key不存在的活,就会触发load方法,在这个方法中会调用generatePayload方法从registry中获取数据构并建缓存数据返回;

(5)Server端将结果设置到response返回到Client端;

源码

Eureka Server

AbstractInstanceRegistry

以register服务注册方法看一下缓存更新的流程,在register方法中,服务进行注册的时候,会根据服务的应用名称appName,调用invalidateCache方法清除之前存在的缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public abstract class AbstractInstanceRegistry implements InstanceRegistry{

// server端的注册表,key是服务的应用名称,value又是一个map, key是实例id,value是实例的详细信息(Lease<InstanceInfo>)
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();

...

// 注册服务
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
this.read.lock();
// 根据应用名称从注册表中获取实例
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
EurekaMonitors.REGISTER.increment(isReplication);
// 如果获取的实例为空
if(gMap == null) {
// 创建一个hashmap
ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
// 加入注册表
gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
if(gMap == null) {
gMap = gNewMap;
}
}
// 根据实例ID获取,实例的详细信息
Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());

...

registrant.setActionType(ActionType.ADDED);
this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 清除缓存
this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), Boolean.valueOf(isReplication)});
} finally {
this.read.unlock();
}

}

// 清除缓存
private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
this.responseCache.invalidate(appName, vipAddress, secureVipAddress);
}

}

ResponseCacheImpl

(1)ResponseCacheImpl中包含了readOnlyCacheMap只读缓存对象和readWriteCacheMap读写缓存对象,其中readWriteCacheMap使用的是guava的缓存机制,如果根据key从readWriteCacheMap获取数据时没有这个key,将会调用load方法,在load方法中是调用generatePayload构建Value对象的,在generatePayload是从registry获取数据的,也就是说当readWriteCacheMap不存在某个key是会从registry获取数据。

(2)在构造函数中,初始化了readWriteCacheMap对象,并判断是否使用readOnlyCacheMap缓存,如果开启,设置定时任务getCacheUpdateTask,定时从readWriteCacheMap同步数据到readOnlyCacheMap,同步方式是遍历readOnlyCacheMap,判断value是否与readWriteCacheMap的数据一致,如果不一致,以readWriteCacheMap的数据为准,更新readOnlyCacheMap的数据。

(3)invalidate方法用来清除readWriteCacheMap中的缓存.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
public class ResponseCacheImpl implements ResponseCache {
...
// readOnlyCacheMap缓存
private final ConcurrentMap<Key, ResponseCacheImpl.Value> readOnlyCacheMap = new ConcurrentHashMap();
// guava构建的readWriteCacheMap缓存
private final LoadingCache<Key, ResponseCacheImpl.Value> readWriteCacheMap;
...

// 构造函数
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
// 是否使用只读缓存
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
// 缓存更新间隔
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
// 构建缓存对象,初始容量为1000
this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(1000).expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS).removalListener(new RemovalListener<Key, ResponseCacheImpl.Value>() {
public void onRemoval(RemovalNotification<Key, ResponseCacheImpl.Value> notification) {
Key removedKey = (Key)notification.getKey();
if(removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
ResponseCacheImpl.this.regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}

}
}).build(new CacheLoader<Key, ResponseCacheImpl.Value>() {
// 如果缓存中key不存在,会调用load方法
public ResponseCacheImpl.Value load(Key key) throws Exception {
if(key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
ResponseCacheImpl.this.regionSpecificKeys.put(cloneWithNoRegions, key);
}
// 根据key构建缓存数据
ResponseCacheImpl.Value value = ResponseCacheImpl.this.generatePayload(key);
return value;
}
});
// 如果使用只读缓存
if(this.shouldUseReadOnlyResponseCache) {
// 注册定时任务,同步数据
this.timer.schedule(this.getCacheUpdateTask(), new Date(System.currentTimeMillis() / responseCacheUpdateIntervalMs * responseCacheUpdateIntervalMs + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs);
}

try {
Monitors.registerObject(this);
} catch (Throwable var7) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", var7);
}

}

// 清除缓存
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
KeyType[] var4 = KeyType.values();
int var5 = var4.length;

for(int var6 = 0; var6 < var5; ++var6) {
KeyType type = var4[var6];
Version[] var8 = Version.values();
int var9 = var8.length;

for(int var10 = 0; var10 < var9; ++var10) {
Version v = var8[var10];
// 实际上调用的invalidate(Key... keys)方法
this.invalidate(new Key[]{new Key(EntityType.Application, appName, type, v, EurekaAccept.full), new Key(EntityType.Application, appName, type, v, EurekaAccept.compact), new Key(EntityType.Application, "ALL_APPS", type, v, EurekaAccept.full), new Key(EntityType.Application, "ALL_APPS", type, v, EurekaAccept.compact), new Key(EntityType.Application, "ALL_APPS_DELTA", type, v, EurekaAccept.full), new Key(EntityType.Application, "ALL_APPS_DELTA", type, v, EurekaAccept.compact)});
if(null != vipAddress) {
this.invalidate(new Key[]{new Key(EntityType.VIP, vipAddress, type, v, EurekaAccept.full)});
}

if(null != secureVipAddress) {
this.invalidate(new Key[]{new Key(EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full)});
}
}
}

}

public void invalidate(Key... keys) {
Key[] var2 = keys;
int var3 = keys.length;

for(int var4 = 0; var4 < var3; ++var4) {
Key key = var2[var4];
logger.debug("Invalidating the response cache key : {} {} {} {}, {}", new Object[]{key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()});
// 根据key清除缓存
this.readWriteCacheMap.invalidate(key);
Collection<Key> keysWithRegions = this.regionSpecificKeys.get(key);
if(null != keysWithRegions && !keysWithRegions.isEmpty()) {
Iterator var7 = keysWithRegions.iterator();

while(var7.hasNext()) {
Key keysWithRegion = (Key)var7.next();
logger.debug("Invalidating the response cache key : {} {} {} {} {}", new Object[]{key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()});
this.readWriteCacheMap.invalidate(keysWithRegion);
}
}
}

}

// 定时任务,从readWriteCacheMap同步数据到readOnlyCacheMap
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
public void run() {
ResponseCacheImpl.logger.debug("Updating the client cache from response cache");
Iterator var1 = ResponseCacheImpl.this.readOnlyCacheMap.keySet().iterator();
// 遍历readOnlyCacheMap
while(var1.hasNext()) {
Key key = (Key)var1.next();
if(ResponseCacheImpl.logger.isDebugEnabled()) {
ResponseCacheImpl.logger.debug("Updating the client cache from response cache for key : {} {} {} {}", new Object[]{key.getEntityType(), key.getName(), key.getVersion(), key.getType()});
}

try {
CurrentRequestVersion.set(key.getVersion());
// 从readWriteCacheMap根据key获取数据
ResponseCacheImpl.Value cacheValue = (ResponseCacheImpl.Value)ResponseCacheImpl.this.readWriteCacheMap.get(key);
// 从readOnlyCacheMap获取数据
ResponseCacheImpl.Value currentCacheValue = (ResponseCacheImpl.Value)ResponseCacheImpl.this.readOnlyCacheMap.get(key);
// 对比数据是否一致
if(cacheValue != currentCacheValue) {
// 如果数据不一致,更新readOnlyCacheMap的数据 ResponseCacheImpl.this.readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable var5) {
ResponseCacheImpl.logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), var5);
}
}

}
};
}

// 根据key生成ResponseCacheImpl.Value对象
private ResponseCacheImpl.Value generatePayload(Key key) {
Stopwatch tracer = null;

ResponseCacheImpl.Value var8;
try {
String payload;
switch(null.$SwitchMap$com$netflix$eureka$registry$Key$EntityType[key.getEntityType().ordinal()]) {
case 1:
boolean isRemoteRegionRequested = key.hasRegions();
// 全量获取
if("ALL_APPS".equals(key.getName())) {
if(isRemoteRegionRequested) {
tracer = this.serializeAllAppsWithRemoteRegionTimer.start();
payload = this.getPayLoad(key, this.registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = this.serializeAllAppsTimer.start();
// 从registry获取数据
payload = this.getPayLoad(key, this.registry.getApplications());
}
} else if("ALL_APPS_DELTA".equals(key.getName())) {// 增量获取
if(isRemoteRegionRequested) {
tracer = this.serializeDeltaAppsWithRemoteRegionTimer.start();
this.versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = this.getPayLoad(key, this.registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = this.serializeDeltaAppsTimer.start();
this.versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = this.getPayLoad(key, this.registry.getApplicationDeltas());
}
} else {
tracer = this.serializeOneApptimer.start();
payload = this.getPayLoad(key, this.registry.getApplication(key.getName()));
}
break;
case 2:
case 3:
tracer = this.serializeViptimer.start();
payload = this.getPayLoad(key, getApplicationsForVip(key, this.registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
}

var8 = new ResponseCacheImpl.Value(payload);
} finally {
if(tracer != null) {
tracer.stop();
}

}

return var8;
}
}

ApplicationsResource

ApplicationsResource的getContainers接收Client端的拉取请求,在getContainers方法中,首先会根据请求信息构建缓存Key,根据key从缓存中获取数据,并设置到response中返回给Client端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ApplicationsResource {
@GET
public Response getContainers(@PathParam("version") String version, @HeaderParam("Accept") String acceptHeader, @HeaderParam("Accept-Encoding") String acceptEncoding, @HeaderParam("X-Eureka-Accept") String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if(!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions);
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}

if(!this.registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
} else {
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = KeyType.JSON;
String returnMediaType = "application/json";
if(acceptHeader == null || !acceptHeader.contains("json")) {
keyType = KeyType.XML;
returnMediaType = "application/xml";
}
// 构建缓存key
Key cacheKey = new Key(EntityType.Application, "ALL_APPS", keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);
Response response;
// 如果acceptEncoding是gizp
if(acceptEncoding != null && acceptEncoding.contains("gzip")) {
response = Response.ok(this.responseCache.getGZIP(cacheKey)).header("Content-Encoding", "gzip").header("Content-Type", returnMediaType).build();
} else {
// 从缓存中根据key获取数据设置到response中
response = Response.ok(this.responseCache.get(cacheKey)).build();
}
return response;
}
}
}

回到ResponseCacheImpl,get方法用来根据key从缓存中获取数据,最终调用的是getValue方法,在getValue方法中可以看到,如果开启了只读缓存,先从readOnlyCacheMap中获取数据,如果未获取到再从readWriteCacheMap中获取,如果未开启只读缓存,直接从readWriteCacheMap中获取,如果readWriteCacheMap不存在某个key,可以回看ResponseCacheImpl构造函数初始化readWriteCacheMap时有一个load方法,不存在某个key时触发该方法,实际上是去registry中取数据并构建缓存数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ResponseCacheImpl implements ResponseCache {
...
// 根据keky获取缓存数据
public String get(Key key) {
return this.get(key, this.shouldUseReadOnlyResponseCache);
}

@VisibleForTesting
String get(Key key, boolean useReadOnlyCache) {
// 根据key从缓存map中获取value
ResponseCacheImpl.Value payload = this.getValue(key, useReadOnlyCache);
return payload != null && !payload.getPayload().equals("")?payload.getPayload():null;
}

@VisibleForTesting
ResponseCacheImpl.Value getValue(Key key, boolean useReadOnlyCache) {
ResponseCacheImpl.Value payload = null;

try {
// 如果使用ReadOnlyCache
if(useReadOnlyCache) {
// 从ReadOnlyCache获取缓存数据
ResponseCacheImpl.Value currentPayload = (ResponseCacheImpl.Value)this.readOnlyCacheMap.get(key);
// 如果获取不为空
if(currentPayload != null) {
payload = currentPayload;
} else {
// 如果获取为空,从readWriteCacheMap中获取数据
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
// 将获取的数据加入readOnlyCacheMap中
this.readOnlyCacheMap.put(key, payload);
}
} else {
// 如果不使用readOnlyCacheMap,直接从readWriteCacheMap获取数据
payload = (ResponseCacheImpl.Value)this.readWriteCacheMap.get(key);
}
} catch (Throwable var5) {
logger.error("Cannot get value for key : {}", key, var5);
}

return payload;
}
}

Eureka Client

DiscoveryClient

(1)在DiscoveryClient的构造方法中,会判断是否需要拉取数据,并且初始化定时任务;

(2)初始化定时任务方法中,会添加缓存更新的定时任务,是在CacheRefreshThread的run方法中实现的,在run方法中调用refreshRegistry刷新缓存,在refreshRegistry方法中又调用了fetchRegistry,所以最终使用的fetchRegistry方法从服务端拉取数据;

(3)fetchRegistry方法中会判断是全量还是增量从Server端拉取数据;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
public class DiscoveryClient implements EurekaClient {

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) {
...
// 如果需要拉取数据
if(this.clientConfig.shouldFetchRegistry() && !this.fetchRegistry(false)) {
// 从备用服务拉取数据
this.fetchRegistryFromBackup();
}
...

// 初始化定时任务
this.initScheduledTasks();

...
}

// 初始化定时任务
private void initScheduledTasks() {
int renewalIntervalInSecs;
int expBackOffBound;
// 是否需要拉取数据
if(this.clientConfig.shouldFetchRegistry()) {
renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();
expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 注册刷新缓存的定时任务
this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
}

if(this.clientConfig.shouldRegisterWithEureka()) {
renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: renew interval is: {}", Integer.valueOf(renewalIntervalInSecs));
// 注册发送心跳的定时任务
this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread(null)), (long)renewalIntervalInSecs, TimeUnit.SECONDS);
// 构建InstanceInfoReplicator,其run方法中会调用discoveryClient.register()发送HTTP请求进行服务注册
this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);
this.statusChangeListener = new StatusChangeListener() {
public String getId() {
return "statusChangeListener";
}

public void notify(StatusChangeEvent statusChangeEvent) {
if(InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {
DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);
} else {
DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);
}

DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();
}
};
if(this.clientConfig.shouldOnDemandUpdateStatusChange()) {
this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);
}

this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}

}

// 缓存刷新任务
class CacheRefreshThread implements Runnable {
CacheRefreshThread() {
}

public void run() {
// 调用refreshRegistry刷新缓存
DiscoveryClient.this.refreshRegistry();
}
}

@VisibleForTesting
void refreshRegistry() {
try {
...

// 调用fetchRegistry从服务端拉取数据
boolean success = this.fetchRegistry(remoteRegionsModified);
if(success) {
this.registrySize = ((Applications)this.localRegionApps.get()).size();
this.lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}

...
} catch (Throwable var9) {
logger.error("Cannot fetch registry from server", var9);
}

}

// 拉取数据
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = this.FETCH_REGISTRY_TIMER.start();

label122: {
boolean var4;
try {
Applications applications = this.getApplications();
if(!this.clientConfig.shouldDisableDelta() && Strings.isNullOrEmpty(this.clientConfig.getRegistryRefreshSingleVipAddress()) && !forceFullRegistryFetch && applications != null && applications.getRegisteredApplications().size() != 0 && applications.getVersion().longValue() != -1L) {
// 增量拉取数据
this.getAndUpdateDelta(applications);
} else {
logger.info("Disable delta property : {}", Boolean.valueOf(this.clientConfig.shouldDisableDelta()));
logger.info("Single vip registry refresh property : {}", this.clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", Boolean.valueOf(forceFullRegistryFetch));
logger.info("Application is null : {}", Boolean.valueOf(applications == null));
logger.info("Registered Applications size is zero : {}", Boolean.valueOf(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", Boolean.valueOf(applications.getVersion().longValue() == -1L));
// 全量拉取数据
this.getAndStoreFullRegistry();
}

applications.setAppsHashCode(applications.getReconcileHashCode());
this.logTotalInstances();
break label122;
} catch (Throwable var8) {
logger.error("DiscoveryClient_{} - was unable to refresh its cache! status = {}", new Object[]{this.appPathIdentifier, var8.getMessage(), var8});
var4 = false;
} finally {
if(tracer != null) {
tracer.stop();
}

}

return var4;
}
// 缓存刷新事件
this.onCacheRefreshed();
this.updateInstanceRemoteStatus();
return true;
}
}

参考:

sharedCode:深入理解Eureka缓存机制(八)

[xmz_java:Eureka 缓存结构以及服务感知优化](https://www.cnblogs.com/xmzJava/p/11359636.html)

[宜信技术:详解Eureka 缓存机制](https://www.cnblogs.com/yixinjishu/p/10871243.html)

Spring Cloud版本:Finchley.RELEASE

【MySQL】更新语句执行流程

Posted on 2020-02-09

buffer pool

buffer pool缓冲池,用来缓存数据页,避免每次都从磁盘上加载数据,由于buffer pool是基于内存的,所以查询速度非常快。

undo log

undo log回滚日志,记录了SQL执行之前的旧值,用来做数据回滚。

redo log

redo log重做日志,是InnoDB存储引擎的日志,记录的是数据页的物理修改,在某个数据页上做了什么修改,用来恢复数据使用。

binlog

binlog 数据库的二进制日志,同样记录了数据的修改,binlog是MySQL的Server层实现的,所有引擎都可以使用。

  1. 当一条更新SQL语句执行时,首先会查看数据是否在缓冲池中,如果在更新缓冲池中的数据,如果不在,会从磁盘中将数据加载到缓冲池buffer pool(对于非唯一索引来说,可以使用change buffer,将修改先记录在change buffer中,之后再merge到原数据页)。
  2. 当数据加载到buffer pool中之后,首先会在undo log中记录修改前的数据,便于数据回滚。
  3. 更新buffer pool中的缓存数据,注意此时只是在缓存池中更新了数据,并没有更新到磁盘上,如果数据库宕机,数据会丢失。
  4. 将修改写入redo log中,redo log也有缓冲区redo log buffer,所以也就是会将数据先写入redo log buffer中,记录对数据的修改,由于redo log buffer也基于内存,所以此时数据库宕机依旧有丢失数据的风险。
  5. 提交事务,当提交事务之后,会根据一定的策略将redo log buffer的数据刷入redo log磁盘文件中,此时如果数据库宕机,重启的时候即可根据redo log文件进行数据恢复。
  6. 将更新内容写入binlog磁盘文件。
  7. 将更新对应的binlog文件名称和更新的binlog日志文件位置写入redo log文件中,并写入commit标记,用来保持redo log和binlog的一致性,此时mysql异常宕机,可以通过redo log和binlog的一致性判断是否需要恢复数据。
  8. 此时事务完成提交,需要注意的是此时数据只是在buffer pool中修改了,并且记录到了redo log和binlog,此时磁盘数据文件中还是旧数据,所以MySql有一个后台的IO线程,会根据配置决定何时将buffer pool中的脏数据刷新到磁盘上的数据文件中。

redo log的写入机制

innodb_flush_log_at_trx_commit:

  • 设置为 0 的时候,表示每次事务提交时都只是把 redo log 留在 redo log buffer 中 ;
  • 设置为 1 的时候,表示每次事务提交时都将 redo log 直接持久化到磁盘;
  • 设置为 2 的时候,表示每次事务提交时都只是把 redo log 写到 page cache。

InnoDB 有一个后台线程,每隔 1 秒,就会把 redo log buffer 中的日志,调用 write 写到文件系统的 page cache,然后调用 fsync 持久化到磁盘。

binlog的写入机制

系统为每个线程分配一片内存为binlog做缓存(binlog cache),事务执行过程中,先将内容写入binlog cache,当事务提交时,会把binlog cache写到binlog文件中,然后清空binlog cache,由于binlog存储在文件系统的 page cache,并没有持久化到磁盘,所以系统根据sync_binlog的配置判断何时将binlog文件的内容fsync到磁盘。

binlog_cache_size:

在事务写入binlog cache时,一个事务的binlog是不能被拆分的,需要确保一次性写入,binlog_cache_size用于控制binlog cache的大小。

sync_binlog:

  • sync_binlog=0 的时候,表示每次提交事务都只 write,不 fsync;
  • sync_binlog=1 的时候,表示每次提交事务都会执行 fsync;
  • sync_binlog=N(N>1) 的时候,表示每次提交事务都 write,但累积 N 个事务后才 fsync。

change buffer

我们知道,更新一条数据的时候,如果数据页就在buffer pool中,直接更新buffer pool中的数据,如果数据页不在buffer pool中,对于非唯一索引来说,由于不需要检查唯一性,InnoDb会将这些更新操作先缓存在change buffer中,可以避免从磁盘中加载数据页,减少磁盘的IO,在下次需要访问这个数据页的数据时,再从磁盘加载到内存,将changer buffer中的更新操作应用到原数据页中,这个过程被称为merge:

对于唯一索引,所有的更新操作必须要检查是否违法唯一约束,假如数据页不在buffer pool,必须要将数据页读入内存才可以判断(这里可以理解为读取索引页,通过索引判断唯一性),所以change buffer对于唯一索引并不适用。

change buffer是buffer pool的一部分,innodb_change_buffer_max_size参数可以设置其在buffer pool中的占比。

change buffer的适用场景:

适用于写多读少的场景,假如写入之后马上就要查询,即便已经将更新记录在了change buffer,由于需要查询,必须要从磁盘加载数据页才能读取到数据,然后触发merge操作,这样并不会减少磁盘的IO操作。

change buffer和redo log的侧重点:

change buffer主要是为了减少随机读磁盘的IO消耗。

redo log主要是为了减少随机写磁盘的IO消耗,redo log将磁盘的随机IO转为了顺序写,提升了速度。

参考:

救火队队长:从零开始带你成为MySQL实战优化高手

极客时间 — 林晓斌(丁奇):MySQL实战

MySQL 之 InnoDB引擎 Change Buffer

【JAVA】AbstractQueuedSynchronizer源码学习

Posted on 2019-11-18

AQS内部的同步队列

AQS内部的同步队列是一个双向队列,队列中的元素是AQS中的Node节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
 static final class Node {
// 共享锁
static final Node SHARED = new Node();
// 独占锁
static final Node EXCLUSIVE = null;

/** 取消状态,处于取消状态的线程不再竞争锁,等待被GC回收 */
static final int CANCELLED = 1;

static final int SIGNAL = -1;

static final int CONDITION = -2;

static final int PROPAGATE = -3;

/**
* 等待状态
*/
volatile int waitStatus;

/**
* 前置节点,当前节点的前一个节点
*/
volatile Node prev;

/**
* 后置节点,当前节点的下一个节点
*/
volatile Node next;

/**
* 节点绑定的线程
*/
volatile Thread thread;

Node nextWaiter;

......
}

waitStatus等待状态的五种情况

  • CANCELLED:对应的值为1,表示节点对应的线程被取消,处于取消状态的节点不会再竞争锁
  • SIGNAL:对应的值为-1,表示该节点后面的节点的线程处于等待获取锁的状态
  • CONDITION:对应的值为-2,表示节点在等待队列中
  • PROPAGATE:对应的值为-3,表示下一次共享式同步状态获取将被无条件的传播
  • 节点的初始状态:对应的值为0

源码

1. acquire()

(1)调用tryAcquire获取锁,如果获取失败调用addWaiter为当前的线程创建Node节点,并调用acquireQueued处理创建的节点:

1
2
3
4
5
6
7
8
9
10
/**
* 获取锁
* @param arg
*/
public final void acquire(int arg) {
// 调用tryAcquire获取锁,如果获取失败,调用addWaiter将任务封装成Node加入队列,调用acquireQueued处理创建的节点
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

AQS中并没有实现tryAcquire方法,只是在方法中抛出了一个异常,需要由AQS的子类来实现该方法:

1
2
3
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

2. addWaiter()

addWaiter方法主要为当前线程创建节点,然后返回创建的节点,在这个方法中有一个队列尾节点为空的判断:

(1)如果尾节点tail不为空,说明队列中已经存在节点,直接将新建的节点加到队列的尾部,然后通过CAS方式将当前节点置为尾节点即可。

(2)如果尾节点为空,说明同步队列中还没有节点,调用enq方法初始化队列的头结点和尾节点,并将当前节点加入队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 创建节点
*/
private Node addWaiter(Node mode) {
// 创建节点,绑定当前线程
Node node = new Node(Thread.currentThread(), mode);
// 尾节点
Node pred = tail;
// 如果尾节点不为空
if (pred != null) {
// 将新的节点加入到尾节点之后
node.prev = pred;
// CAS将新建的节点设置成尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果尾节点为空,在enq中初始化head和tail节点之后再把新节点插入到队列后边
enq(node);
return node;
}

enq()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 初始化队列的头结点和尾节点,并将传入的节点加入队列
* @param node
* @return
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 创建一个新节点,当做头结点,并设置尾节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 将传入的节点插入到头结点之后并设为尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

3. acquireQueued()

acquireQueued方法主要用来对节点进行处理,在方法中有一个for循环不断的在尝试获取锁以及判断当前任务是否应该被阻塞等待:

判断当前节点的前一个节点是否为头结点:

(1)当前节点的前一个节点是头结点,因为当前节点的前一个节点是头结点,此时重新调用tryAcquire方法尝试获取锁,如果获取成功将当前节点置为头结点,并将之前的头结点的next置为空,等待被回收,然后返回线程的中断状态,如果获取失败继续for循环的流程。

(2)当前节点的前一个节点不是头结点,调用shouldParkAfterFailedAcquire方法设置节点的等待状态,shouldParkAfterFailedAcquire方法如果返回true(返回true表示当前节点已处于等待状态可以被阻塞),执行parkAndCheckInterrupt方法,这个方法用来阻塞当前线程直到被唤醒,之后返回线程的中断状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果当前节点的前一个节点为头结点,重新调用tryAcquire方法尝试获取锁
if (p == head && tryAcquire(arg)) {
// 如果获取锁成功,将当前节点置为头结点
setHead(node);
// 将之前的头结点的next置为空
p.next = null; // help GC
failed = false;
// 返回中断状态
return interrupted;
}
// 如果当前节点的前一个节点不是头结点
// shouldParkAfterFailedAcquire中设置当前节点的前一个节点的等待状态,让前一个节点知道当前节点进入等待
// parkAndCheckInterrupt让当前线程进入阻塞状态,直到被唤醒之后,继续执行,并返回中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;// 记录中断状态
}
} finally {
if (failed)
cancelAcquire(node);
}
}

shouldParkAfterFailedAcquire()

在shouldParkAfterFailedAcquire方法中,==主要是为了让当前节点处于等待状态==,设置的方式是找到当前节点之前第一个waitStatus等于或小于0的节点,然后设置prev的waitStatus。

为什么要找到waitStatus等于或小于0的节点?

通过waitStatus的几种值可以知道大于0的时候表示这个节点处于取消状态,之后是要从队列中移除的,所以要不断向前找,直到找到waitStatus等于或小于0的节点。

因为节点的等待状态可能会发生变化,所以先判断当前节点的前一个节点的等待状态waitStatus:

(1)等待状态是-1,表示这个节点之后的节点处于等待锁的状态,此时直接返回true即可。

(2)等待状态大于0,表示pred节点可能已被取消,此时要跳过这个节点,一直往上一个节点找,直到找到一个等待状态小于或者等于0的节点,并将这个节点的next指向当前节点(此时shouldParkAfterFailedAcquire返回的结果为false)。

(3)如果当前节点的前一个节点perd的等待状态既不等于-1,也不大于0,此时将pred的等待状态设为-1,表示这个节点之后的节点处于等待锁的状态(此时shouldParkAfterFailedAcquire返回的结果为false)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 当前节点的前一个节点的等待状态
int ws = pred.waitStatus;
// 如果是signal状态(-1),返回true
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
// 如果当前节点的前一个节点的等待状态大于0,从前一个节点开始一直往前找,直到找到等待状态不大于0的节点,返回false
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 当前节点的前一个节点的状态既不等于-1,也不大于0,设置当前节点的前一个节点的状态为signal(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

由上可知shouldParkAfterFailedAcquire中当前节点的前一个节点的等待状态是-1时返回的是true,其他情况返回false,但是返回false之后下一次再执行shouldParkAfterFailedAcquire,就可以满足条件返回true(因为调用了compareAndSetWaitStatus方法设置前一个节点的等待状态是-1,所以第二次进入可以满足条件)。

parkAndCheckInterrupt()

parkAndCheckInterrupt方法用来阻塞当前线程,然后返回线程的中断状态。LockSupport.park(this)会让当前线程进入阻塞状态,直到被其他线程唤醒(或者被中断),然后会调用Thread.interrupted()返回线程的中断位:

  • 如果当前线程处于中断状态,Thread.interrupted()返回true,下次调用Thread.interrupted()将会返回false,因为中断位被重置了。
  • 如果当前线程处于非中断状态,Thread.interrupted()返回false。
1
2
3
4
5
6
7
8
9
10
/**
* 阻塞线程
* @return
*/
private final boolean parkAndCheckInterrupt() {
// 阻塞当前的线程
LockSupport.park(this);
// 返回当前线程的中断位
return Thread.interrupted();
}

回到acquiredQueue方法:

如果parkAndCheckInterrupt返回true,将 interrupted 设为 true,记录了线程的中断状态,当线程尝试获取锁成功之后,将interrupted中断中断状态返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;//返回中断状态
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//设置中断状态
}
} finally {
if (failed)
cancelAcquire(node);//取消竞争锁
}
}

为什么要记录中断状态?

假设一个线程被中断唤醒,Thread.interrupted()返回线程的中断状态true,之后在acquireQueued方法中并没有中断线程的相关操作,被中断之后还在尝试获取锁,获取成功之后将中断状态返回(此时返回true),再回到acquire方法中,如果acquireQueued返回true代表线程被中断过,此时调用selfInterrupt方法,来中断当前线程,由于调用Thread.interrupted()的时候会重置中断位,此时不能靠Thread.interrupted()判断中断位,所以要记录一下中断状态表明线程被中断过,然后根据这个状态在acquire方法做中断处理。

1
2
3
4
5
6
public final void acquire(int arg) {
// 调用tryAcquire获取锁,如果获取失败,调用addWaiter将任务封装成Node,调用acquireQueued将node加入队列
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

4.cancelAcquire()

在acquireQueued方法的finally语句块中,如果在循环的过程中出现了异常,则执行cancelAcquire方法,用于将该节点标记为取消状态。
cancelAcquire()方法中,首先也要找到当前节点之前第一个waitStatus<=0的节点pred,此时当前节点有三种情况:

(1)当前节点如果是尾节点,当前节点直接从队尾删掉即可,先将pred的next置为空,然后pred设为尾节点。

(2)pred不是头结点,也就是说当前节点处于队列中间,并且它的前一个节点pred(指的是第一个waitStatus<=0的节点)不是头结点,此时判断pred的等待状态,如果是SIGNAL或者非SIGNAL,就调用compareAndSetWaitStatus方法将它的状态置为SIGNAL,然后将当前节点的下一个节点置为到pred的下一个节点。

(3)pred是头结点,也就是当前节点在头结点之后,此时直接唤醒后继节点即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void cancelAcquire(Node node) {
// 节点为直接返回
if (node == null)
return;
// 将节点绑定的线程置为空
node.thread = null;

// 获取当前节点的前一个节点
Node pred = node.prev;
// 从当前节点的前一个节点开始,直到找到waitStatus<=0的节点记为pred
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// pred的下一个节点
Node predNext = pred.next;
// 将当前节点的等待状态置为取消
node.waitStatus = Node.CANCELLED;
// (1)如果当前节点是尾节点,将pred置为尾节点
if (node == tail && compareAndSetTail(node, pred)) {
//将pred的next置为空
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//(2)当前节点位于队列中间的情况
// 当前节点不是head节点的下一个节点,或者pred的等待状态是SIGNAL 再或者不是SIGNAL,就将pred的等待状态置为SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {// pred绑定的线程也不能为空
//当前节点的下一个节点
Node next = node.next;
// 如果下一个节点不为空并且不是取消状态
if (next != null && next.waitStatus <= 0)
// 将pred的下一个节点指向当前节点的下一个节点
compareAndSetNext(pred, predNext, next);
} else {
//(3)当前节点是头结点的下一个节点,唤醒后继节点
unparkSuccessor(node);
}
// 当前节点的下一个节点指向自己等待回收
node.next = node;
}
}

5.release()

release方法用来释放锁,释放锁的实现在tryRelease方法,由AQS的子类来实现这个方法,释放锁成功之后,调用unparkSuccessor方法从头结点开始唤醒之后的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public final boolean release(int arg) {
// tryRelease由AQS子类实现,用来释放锁,如果释放锁成功
if (tryRelease(arg)) {
// 头结点
Node h = head;
// 如果队列头节点不为空并且等待装不是0,头结点初始化时等待状态为0,不为0表示它之后的节点处于等待状态
if (h != null && h.waitStatus != 0)
// 唤醒头结点的后继节点
unparkSuccessor(h);
return true;
}
return false;
}

// 唤醒后继节点
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
// 如果当前节点的等待状态小于0,将等待状态置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 当前节点的下一个节点
Node s = node.next;
// 下一个节点为空或者状态是取消
if (s == null || s.waitStatus > 0) {
s = null;
// 从队列的尾部开始向前查找,直到找到第一个等待状态<=0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒节点绑定的线程
LockSupport.unpark(s.thread);
}

参考:

[Idea Buffer:深入理解AbstractQueuedSynchronizer(一)](http://www.ideabuffer.cn/2017/03/15/深入理解AbstractQueuedSynchronizer(一)/)

我们都是小青蛙:java并发性能(五)

jk’s Blog:【Java并发】详解 AbstractQueuedSynchronizer

waterystone:Java并发之AQS详解

go2sea:Java多线程之JUC包:AbstractQueuedSynchronizer(AQS)源码学习笔记

jdk版本:1.8

【Elasticsearch】Master选举流程

Posted on 2019-03-25

Elasticsearch中Discovery模块负责发现集群中的节点以及Master的选举,其默认的实现称为Zen Discovery。

在Elasticsearch的配置文件中,有一项为node.master,如果将该配置设为true,该节点即可参与Master选举,获得被选举为Master节点的资格。

Master选举算法

(1)Paxos算法

Paxos算法在分布式系统中是一种比较著名的选举算法,并且非常强大,但是它实现比较复杂,这里不过多讲解。

(2)Bully算法

Bully算法假设集群中所有的节点都有一个唯一的ID,通过对ID进行排序,选取ID最大的节点作为Master节点。

brain-split问题

一个集群建立起之后会选出一个master,负责管理整个集群,当master负载比较大时或者产生网络分区时,导致其他节点可能认为master节点已失效,从而选举新的节点,出现多个master的情况,这就是brain-split问题。
ES在选举master时,获得的投票数必须要达到quorum也就是参选人数需要过半,才能确认Master,quorum的数量可以在配置文件中配置discovery.zen.minimum_master_nodes,一般配置数量为集群中具有master资格的节点数除以2加1:

1
discovery.zen.minimum_master_nodes: 1

Elasticsearch Master选举流程

Elasticsearch基于Bully算法,选举流程如下:

1.Ping所有的节点,选举临时的Master

  • fullPingResponses

选举过程的实现位于ZenDiscovery类的findMaster方法中,在该方法中首先Ping集群中的所有节点,得到返回结果fullPingResponses,fullPingResponses是由集群中的节点组成的列表,但是不包含当前的节点,当前节点单独被添加到fullPingResponses中。接着,将discovery.zen.master_election.ignore_non_master_pings为true并且不具备Master资格的节点过滤掉,并放入pingResponses中。

  • activeMasters

activeMasters存储当前活跃的Master列表,它是通过遍历pingResponses,将每个节点认为的Master节点(并且不是当前节点)加入到activeMasters列表中。

  • masterCandidates

masterCandidates存储Master候选者列表,也就是具有Master资格的节点,它也是通过遍历pingResponses,判断每个节点是否具有Master资格得到的候选者列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startElectionContext();
// 如果master为空
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
// 选举临时的Master
masterNode = findMaster();
}
......
}

private DiscoveryNode findMaster() {
logger.trace("starting to ping");
//ping所有的节点,得到节点列表,列表中不包含当前节点
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
if (fullPingResponses == null) {
logger.trace("No full ping responses");
return null;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
if (fullPingResponses.size() == 0) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.trace("full ping responses:{}", sb);
}
// 获取当前节点
final DiscoveryNode localNode = transportService.getLocalNode();

// add our selves
assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
.filter(n -> n.equals(localNode)).findAny().isPresent() == false;
// 将当前节点加入节点列表
fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));

// 过滤节点,将discovery.zen.master_election.ignore_non_master_pings为true并且不具备Master资格的节点过滤掉
final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
// activeMasters存储当前活跃的Master列表
List<DiscoveryNode> activeMasters = new ArrayList<>();
// 遍历ping后得到的节点列表
for (ZenPing.PingResponse pingResponse : pingResponses) {
// 如果每个节点选举出的Master节点不为空并且不是当前节点
if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
// 将推举出的master节点加入到activeMasters中
activeMasters.add(pingResponse.master());
}
}

// masterCandidates存储Master候选者列表
List<ElectMastermasterCandidatesService.MasterCandidate> masterCandidates = new ArrayList<>();
// 再次遍历集群中的节点列表
for (ZenPing.PingResponse pingResponse : pingResponses) {
// 如果节点具有Master资格
if (pingResponse.node().isMasterNode()) {
// 加入到masterCandidates候选者列表中
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
}
}
// 如果当前活跃的Master列表为空
if (activeMasters.isEmpty()) {
// 从候选者Master列表中判断是否达到法定人数
if (electMaster.hasEnoughCandidates(masterCandidates)) {
// 选举出Master节点
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
// 返回Master节点
return winner.getNode();
} else {
// 如果没有足够的候选者,返回空
logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
masterCandidates, electMaster.minimumMasterNodes());
return null;
}
} else {
// 判断当前节点是否在活跃的Master列表中
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// 选举Master节点
return electMaster.tieBreakActiveMasters(activeMasters);
}
}
}
具体选举过程

首先判断activeMasters是否为空,如果不为空,从activeMasters选举。如果为空,判断masterCandidates中候选者是否达到了法定人数 quorum,如果达到,从中选举Master节点,如果未到达,重新获取节点。

(1)从masterCandidates选举

具体的实现在ElectMasterService中:

判断是否到达法定人数

  • 判断masterCandidates是否为空,为空返回false
  • 判断discovery.zen.minimum_master_nodes配置的值(默认值为-1)是否小于1,确保单节点情况下的正常选主
  • 判断masterCandidates中具备master资格的节点数据是否大于等于minimum_master_nodes

选举临时Master

  • 首先对masterCandidates中的节点进行排序,优先使用版本号,版本号最高的排在前面,如果版本号一致,再跟进节点ID进行排序,节点ID小的排在前面
  • 返回排序后最前面的节点,也就是第一个节点作为Master节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class ElectMasterService extends AbstractComponent {

// 是否达到法定人数
public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
// 如果为空返回false
if (candidates.isEmpty()) {
return false;
}
// minimumMasterNodes默认值为-1,确保单节点情况下正常选主
if (minimumMasterNodes < 1) {
return true;
}
assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
"duplicates ahead: " + candidates;
// 判断是否达到法定人数
return candidates.size() >= minimumMasterNodes;
}

// 选举Master
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
assert hasEnoughCandidates(candidates);
List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
// 对节点进行排序
sortedCandidates.sort(MasterCandidate::compare);
// 获取排序后的第一个节点作为Master节点
return sortedCandidates.get(0);
}

// 内部类,自定义比较函数在此类中实现
public static class MasterCandidate {

public static final long UNRECOVERED_CLUSTER_VERSION = -1;

final DiscoveryNode node;

final long clusterStateVersion;

public MasterCandidate(DiscoveryNode node, long clusterStateVersion) {
Objects.requireNonNull(node);
assert clusterStateVersion >= -1 : "got: " + clusterStateVersion;
assert node.isMasterNode();
this.node = node;
this.clusterStateVersion = clusterStateVersion;
}

public DiscoveryNode getNode() {
return node;
}

public long getClusterStateVersion() {
return clusterStateVersion;
}

@Override
public String toString() {
return "Candidate{" +
"node=" + node +
", clusterStateVersion=" + clusterStateVersion +
'}';
}

/**
* 自定义比较函数,早期版本中对节点ID排序,现在优先使集群状态版本号排序,版本号高的放在前面,版本号一致的再对比节点ID,节点ID小的排前面
*/
public static int compare(MasterCandidate c1, MasterCandidate c2) {
// 比较版本号,注意这里c2在前
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
// 如果版本号一致比较节点的ID
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
}

/**
* 根据节点ID比较大小
*/
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
// 判断是否具有master节点资格是为了给其他函数调用的,masterCandidates中的节点都已经具备了master节点资格
// 如果o1具备Master节点资格,o2不具备,返回-1,也就是o1排前面
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
// 如果o1不具备master节点资格而o2有,返回1,o2排前面
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
// 比较节点的ID
return o1.getId().compareTo(o2.getId());
}
}

(2)从activeMasters选举

从activeMasters选举的过程比较简单,具体的实现也在ElectMasterService中。首先通过compareNodes方法对节点ID排序,然后取节点ID最小的作为临时的Master。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ElectMasterService extends AbstractComponent {
public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
// 取activeMasters中节点ID最小的作为Master,同样使用compareNodes进行排序
return activeMasters.stream().min(ElectMasterService::compareNodes).get();
}

/**
* 根据节点ID比较大小
*/
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
// 判断是否具有master节点资格是为了给其他函数调用的,masterCandidates中的节点都已经具备了master节点资格
// 如果o1具备Master节点资格,o2不具备,返回-1,也就是o1排前面
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
// 如果o1不具备master节点资格而o2有,返回1,o2排前面
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
// 比较节点的ID
return o1.getId().compareTo(o2.getId());
}
}

2.确立Master

上一步选举出的临时的Master有两种情况,临时Master就是当前节点或者临时Master不是当前节点。
在ES中,向节点发送join请求就是发送投票,被发送请求的节点将会得到一票。

(1)临时Master节点为当前节点

  • 等待足够多的具有Master资格的节点加入本节点,达到法定人数的投票数量时完成选主
  • 如果等待超时后还没有满足法定数量,选举失败,将会进行新一轮的选举
  • 如果选主成功,将会发布新的cluster state version

ZenDiscovery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startElectionContext();
// 如果master为空
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
// 选举临时的Master
masterNode = findMaster();
}

if (!joinThreadControl.joinThreadActive(currentThread)) {
logger.trace("thread is no longer in currentJoinThread. Stopping.");
return;
}
// 如果选举出的Master节点是当前节点
if (transportService.getLocalNode().equals(masterNode)) {
// 因为选举出的Master节点是当前节点,所以minimum_master_nodes数量-1,得到被选举为Master需要的最少节点数量,也就是法定人数
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1);
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
// 根据法定人数等待其他节点的加入(投票),等待完成选举
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {// 回调函数
@Override
public void onElectedAsMaster(ClusterState state) {
synchronized (stateMutex) {
// 选举完成 joinThreadControl.markThreadAsDone(currentThread);
}
}

@Override
public void onFailure(Throwable t) {// 如果选举失败
logger.trace("failed while waiting for nodes to join, rejoining", t);
synchronized (stateMutex) {
// 重新选举 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}

);
} else {
......
}
}
}

(2)临时Master节点不是当前节点

  • 当前节点停止接收其他节点的join请求
  • 最终当选的Master会先发布集群状态,之后确认其他节点的join请求。
  • 当前节点向Master节点发送join请求,并且等待Master的回复(默认为1分钟),如果失败重新发送join请求(默认重试3次),如果回复成功,从集群中获取Master节点,判断与临时Master是否一致,如果不一致重新选举。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

private void innerJoinCluster() {

......

// 如果选举出的Master节点是当前节点
if (transportService.getLocalNode().equals(masterNode)) {
......

} else { // 选举出的Master节点不是当前节点
// 停止其他节点的join请求
nodeJoinController.stopElectionContext(masterNode + " elected");

// 因为当前节点不是当选的临时master节点,临时Master节点当做最终的Master节点,向Master节点发出join请求
final boolean success = joinElectedMaster(masterNode);

synchronized (stateMutex) {
if (success) {// 如果成功
// 从集群中获取Master节点
DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
if (currentMasterNode == null) {
// 如果master为空,重新选举
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
} else if (currentMasterNode.equals(masterNode) == false) { //如果当选的master节点不是之前选出的临时Master节点

// 停止当前线程并且重新join joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
}
//选举
joinThreadControl.markThreadAsDone(currentThread);
} else {
// 如果请求失败,重新发出join请求
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
}
}

投票与得票的实现

在确定Master节点的时候,如果推选的临时Master为当前节点,有一步是调用waitToBeElectedAsMaster方法,等待当前节点被推举为真正的Master节点,借助waitToBeElectedAsMaster方法看一下投票与得票的实现。

NodeJoinController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public class NodeJoinController extends AbstractComponent {

public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
// 创建一个计数器
final CountDownLatch done = new CountDownLatch(1);
// 创建回调函数
final ElectionCallback wrapperCallback = new ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
// 选举成功,计数器减1
done.countDown();
callback.onElectedAsMaster(state);
}

@Override
public void onFailure(Throwable t) {
// 如果选举失败,计数器减1
done.countDown();
callback.onFailure(t);
}
};

ElectionContext myElectionContext = null;

try {
synchronized (this) {
// 判断ElectionContext是否为空
assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";
myElectionContext = electionContext;
// 设置回调函数和法定人数,为选举做准备
electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback);
// 这里判断是否到达法定人数,如果达到,被选为Master
checkPendingJoinsAndElectIfNeeded();
}

try {
// 选主成功或失败都会将计数器减1,这里等待选举结束(设置了超时时间,如果在这段时间内选举还没有结束,放弃等待)
if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
// callback handles everything
return;
}
} catch (InterruptedException e) {

}
if (logger.isTraceEnabled()) {
// 再次收集投票数量
final int pendingNodes = myElectionContext.getPendingMasterJoinsCount();
logger.trace("timed out waiting to be elected. waited [{}]. pending master node joins [{}]", timeValue, pendingNodes);
}
// 停止选举
failContextIfNeeded(myElectionContext, "timed out waiting to be elected");
} catch (Exception e) {
logger.error("unexpected failure while waiting for incoming joins", e);
if (myElectionContext != null) {
failContextIfNeeded(myElectionContext, "unexpected failure while waiting for pending joins [" + e.getMessage() + "]");
}
}
}

/**
* 检查发送请求的数量(投票数)是否达到成为Master的条件
*/
private synchronized void checkPendingJoinsAndElectIfNeeded() {
assert electionContext != null : "election check requested but no active context";
// 获取投票数
final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
// 判断投票数是否达到法定人数
if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
if (logger.isTraceEnabled()) {
logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
electionContext.requiredMasterJoins);
}
} else {// 如果达到了法定人数
if (logger.isTraceEnabled()) {
logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
electionContext.requiredMasterJoins);
}
// 成为Master节点
electionContext.closeAndBecomeMaster();
electionContext = null; // clear this out so future joins won't be accumulated
}
}

// 选举为Master
public synchronized void closeAndBecomeMaster() {
assert callback != null : "becoming a master but the callback is not yet set";
assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master but pending joins of "
+ getPendingMasterJoinsCount() + " are not enough. needs [" + requiredMasterJoins + "];";

innerClose();

Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";

tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
// 这里更新cluster state version
masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}

// 是否达到法定人数
public synchronized boolean isEnoughPendingJoins(int pendingMasterJoins) {
final boolean hasEnough;
// 如果法定人数小于0
if (requiredMasterJoins < 0) {
hasEnough = false;
} else {
assert callback != null : "requiredMasterJoins is set but not the callback";
// 判断投票数是否达到法定人数
hasEnough = pendingMasterJoins >= requiredMasterJoins;
}
return hasEnough;
}

class ElectionContext {
private ElectionCallback callback = null;
private int requiredMasterJoins = -1;
private final Map<DiscoveryNode, List<MembershipAction.JoinCallback>> joinRequestAccumulator = new HashMap<>();

final AtomicBoolean closed = new AtomicBoolean();

public synchronized void onAttemptToBeElected(int requiredMasterJoins, ElectionCallback callback) {
ensureOpen();
assert this.requiredMasterJoins < 0;
assert this.callback == null;
this.requiredMasterJoins = requiredMasterJoins;
this.callback = callback;
}

......

// 获取投票数
public synchronized int getPendingMasterJoinsCount() {
int pendingMasterJoins = 0;
// 节点收到的join连接被存储在joinRequestAccumulator,遍历joinRequestAccumulator.
for (DiscoveryNode node : joinRequestAccumulator.keySet()) {
// 如果发送请求的节点具有master节点资格
if (node.isMasterNode()) {
// 投票数+1
pendingMasterJoins++;
}
}
return pendingMasterJoins;
}

......
}
}

参考:

Elasticsearch源码解析与优化实战【张超】

Elasticsearch分布式一致性原理剖析(一)-节点篇

Elasticsearch版本:6.1.2

1…3456

shan

53 posts
12 tags
© 2022 shan
Powered by Hexo
|
Theme — NexT.Muse v5.1.4