【Elasticsearch】Master选举流程

Elasticsearch中Discovery模块负责发现集群中的节点以及Master的选举,其默认的实现称为Zen Discovery。

在Elasticsearch的配置文件中,有一项为node.master,如果将该配置设为true,该节点即可参与Master选举,获得被选举为Master节点的资格。

Master选举算法

(1)Paxos算法

Paxos算法在分布式系统中是一种比较著名的选举算法,并且非常强大,但是它实现比较复杂,这里不过多讲解。

(2)Bully算法

Bully算法假设集群中所有的节点都有一个唯一的ID,通过对ID进行排序,选取ID最大的节点作为Master节点。

brain-split问题

一个集群建立起之后会选出一个master,负责管理整个集群,当master负载比较大时或者产生网络分区时,导致其他节点可能认为master节点已失效,从而选举新的节点,出现多个master的情况,这就是brain-split问题。
ES在选举master时,获得的投票数必须要达到quorum也就是参选人数需要过半,才能确认Master,quorum的数量可以在配置文件中配置discovery.zen.minimum_master_nodes,一般配置数量为集群中具有master资格的节点数除以2加1:

1
discovery.zen.minimum_master_nodes: 1

Elasticsearch Master选举流程

Elasticsearch基于Bully算法,选举流程如下:

1.Ping所有的节点,选举临时的Master

  • fullPingResponses

选举过程的实现位于ZenDiscovery类的findMaster方法中,在该方法中首先Ping集群中的所有节点,得到返回结果fullPingResponses,fullPingResponses是由集群中的节点组成的列表,但是不包含当前的节点,当前节点单独被添加到fullPingResponses中。接着,将discovery.zen.master_election.ignore_non_master_pings为true并且不具备Master资格的节点过滤掉,并放入pingResponses中。

  • activeMasters

activeMasters存储当前活跃的Master列表,它是通过遍历pingResponses,将每个节点认为的Master节点(并且不是当前节点)加入到activeMasters列表中。

  • masterCandidates

masterCandidates存储Master候选者列表,也就是具有Master资格的节点,它也是通过遍历pingResponses,判断每个节点是否具有Master资格得到的候选者列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startElectionContext();
// 如果master为空
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
// 选举临时的Master
masterNode = findMaster();
}
......
}

private DiscoveryNode findMaster() {
logger.trace("starting to ping");
//ping所有的节点,得到节点列表,列表中不包含当前节点
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
if (fullPingResponses == null) {
logger.trace("No full ping responses");
return null;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
if (fullPingResponses.size() == 0) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.trace("full ping responses:{}", sb);
}
// 获取当前节点
final DiscoveryNode localNode = transportService.getLocalNode();

// add our selves
assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
.filter(n -> n.equals(localNode)).findAny().isPresent() == false;
// 将当前节点加入节点列表
fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));

// 过滤节点,将discovery.zen.master_election.ignore_non_master_pings为true并且不具备Master资格的节点过滤掉
final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
// activeMasters存储当前活跃的Master列表
List<DiscoveryNode> activeMasters = new ArrayList<>();
// 遍历ping后得到的节点列表
for (ZenPing.PingResponse pingResponse : pingResponses) {
// 如果每个节点选举出的Master节点不为空并且不是当前节点
if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
// 将推举出的master节点加入到activeMasters中
activeMasters.add(pingResponse.master());
}
}

// masterCandidates存储Master候选者列表
List<ElectMastermasterCandidatesService.MasterCandidate> masterCandidates = new ArrayList<>();
// 再次遍历集群中的节点列表
for (ZenPing.PingResponse pingResponse : pingResponses) {
// 如果节点具有Master资格
if (pingResponse.node().isMasterNode()) {
// 加入到masterCandidates候选者列表中
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
}
}
// 如果当前活跃的Master列表为空
if (activeMasters.isEmpty()) {
// 从候选者Master列表中判断是否达到法定人数
if (electMaster.hasEnoughCandidates(masterCandidates)) {
// 选举出Master节点
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
// 返回Master节点
return winner.getNode();
} else {
// 如果没有足够的候选者,返回空
logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
masterCandidates, electMaster.minimumMasterNodes());
return null;
}
} else {
// 判断当前节点是否在活跃的Master列表中
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// 选举Master节点
return electMaster.tieBreakActiveMasters(activeMasters);
}
}
}
具体选举过程

首先判断activeMasters是否为空,如果不为空,从activeMasters选举。如果为空,判断masterCandidates中候选者是否达到了法定人数 quorum,如果达到,从中选举Master节点,如果未到达,重新获取节点。

(1)从masterCandidates选举

具体的实现在ElectMasterService中:

判断是否到达法定人数

  • 判断masterCandidates是否为空,为空返回false
  • 判断discovery.zen.minimum_master_nodes配置的值(默认值为-1)是否小于1,确保单节点情况下的正常选主
  • 判断masterCandidates中具备master资格的节点数据是否大于等于minimum_master_nodes

选举临时Master

  • 首先对masterCandidates中的节点进行排序,优先使用版本号,版本号最高的排在前面,如果版本号一致,再跟进节点ID进行排序,节点ID小的排在前面
  • 返回排序后最前面的节点,也就是第一个节点作为Master节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class ElectMasterService extends AbstractComponent {

// 是否达到法定人数
public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
// 如果为空返回false
if (candidates.isEmpty()) {
return false;
}
// minimumMasterNodes默认值为-1,确保单节点情况下正常选主
if (minimumMasterNodes < 1) {
return true;
}
assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
"duplicates ahead: " + candidates;
// 判断是否达到法定人数
return candidates.size() >= minimumMasterNodes;
}

// 选举Master
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
assert hasEnoughCandidates(candidates);
List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
// 对节点进行排序
sortedCandidates.sort(MasterCandidate::compare);
// 获取排序后的第一个节点作为Master节点
return sortedCandidates.get(0);
}

// 内部类,自定义比较函数在此类中实现
public static class MasterCandidate {

public static final long UNRECOVERED_CLUSTER_VERSION = -1;

final DiscoveryNode node;

final long clusterStateVersion;

public MasterCandidate(DiscoveryNode node, long clusterStateVersion) {
Objects.requireNonNull(node);
assert clusterStateVersion >= -1 : "got: " + clusterStateVersion;
assert node.isMasterNode();
this.node = node;
this.clusterStateVersion = clusterStateVersion;
}

public DiscoveryNode getNode() {
return node;
}

public long getClusterStateVersion() {
return clusterStateVersion;
}

@Override
public String toString() {
return "Candidate{" +
"node=" + node +
", clusterStateVersion=" + clusterStateVersion +
'}';
}

/**
* 自定义比较函数,早期版本中对节点ID排序,现在优先使集群状态版本号排序,版本号高的放在前面,版本号一致的再对比节点ID,节点ID小的排前面
*/
public static int compare(MasterCandidate c1, MasterCandidate c2) {
// 比较版本号,注意这里c2在前
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
// 如果版本号一致比较节点的ID
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
}

/**
* 根据节点ID比较大小
*/
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
// 判断是否具有master节点资格是为了给其他函数调用的,masterCandidates中的节点都已经具备了master节点资格
// 如果o1具备Master节点资格,o2不具备,返回-1,也就是o1排前面
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
// 如果o1不具备master节点资格而o2有,返回1,o2排前面
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
// 比较节点的ID
return o1.getId().compareTo(o2.getId());
}
}

(2)从activeMasters选举

从activeMasters选举的过程比较简单,具体的实现也在ElectMasterService中。首先通过compareNodes方法对节点ID排序,然后取节点ID最小的作为临时的Master。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ElectMasterService extends AbstractComponent {
public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
// 取activeMasters中节点ID最小的作为Master,同样使用compareNodes进行排序
return activeMasters.stream().min(ElectMasterService::compareNodes).get();
}

/**
* 根据节点ID比较大小
*/
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
// 判断是否具有master节点资格是为了给其他函数调用的,masterCandidates中的节点都已经具备了master节点资格
// 如果o1具备Master节点资格,o2不具备,返回-1,也就是o1排前面
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
// 如果o1不具备master节点资格而o2有,返回1,o2排前面
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
// 比较节点的ID
return o1.getId().compareTo(o2.getId());
}
}

2.确立Master

上一步选举出的临时的Master有两种情况,临时Master就是当前节点或者临时Master不是当前节点。
在ES中,向节点发送join请求就是发送投票,被发送请求的节点将会得到一票。

(1)临时Master节点为当前节点

  • 等待足够多的具有Master资格的节点加入本节点,达到法定人数的投票数量时完成选主
  • 如果等待超时后还没有满足法定数量,选举失败,将会进行新一轮的选举
  • 如果选主成功,将会发布新的cluster state version

ZenDiscovery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startElectionContext();
// 如果master为空
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
// 选举临时的Master
masterNode = findMaster();
}

if (!joinThreadControl.joinThreadActive(currentThread)) {
logger.trace("thread is no longer in currentJoinThread. Stopping.");
return;
}
// 如果选举出的Master节点是当前节点
if (transportService.getLocalNode().equals(masterNode)) {
// 因为选举出的Master节点是当前节点,所以minimum_master_nodes数量-1,得到被选举为Master需要的最少节点数量,也就是法定人数
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1);
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
// 根据法定人数等待其他节点的加入(投票),等待完成选举
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {// 回调函数
@Override
public void onElectedAsMaster(ClusterState state) {
synchronized (stateMutex) {
// 选举完成 joinThreadControl.markThreadAsDone(currentThread);
}
}

@Override
public void onFailure(Throwable t) {// 如果选举失败
logger.trace("failed while waiting for nodes to join, rejoining", t);
synchronized (stateMutex) {
// 重新选举 joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}

);
} else {
......
}
}
}

(2)临时Master节点不是当前节点

  • 当前节点停止接收其他节点的join请求
  • 最终当选的Master会先发布集群状态,之后确认其他节点的join请求。
  • 当前节点向Master节点发送join请求,并且等待Master的回复(默认为1分钟),如果失败重新发送join请求(默认重试3次),如果回复成功,从集群中获取Master节点,判断与临时Master是否一致,如果不一致重新选举。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {

private void innerJoinCluster() {

......

// 如果选举出的Master节点是当前节点
if (transportService.getLocalNode().equals(masterNode)) {
......

} else { // 选举出的Master节点不是当前节点
// 停止其他节点的join请求
nodeJoinController.stopElectionContext(masterNode + " elected");

// 因为当前节点不是当选的临时master节点,临时Master节点当做最终的Master节点,向Master节点发出join请求
final boolean success = joinElectedMaster(masterNode);

synchronized (stateMutex) {
if (success) {// 如果成功
// 从集群中获取Master节点
DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
if (currentMasterNode == null) {
// 如果master为空,重新选举
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
} else if (currentMasterNode.equals(masterNode) == false) { //如果当选的master节点不是之前选出的临时Master节点

// 停止当前线程并且重新join joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
}
//选举
joinThreadControl.markThreadAsDone(currentThread);
} else {
// 如果请求失败,重新发出join请求
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
}
}

投票与得票的实现

在确定Master节点的时候,如果推选的临时Master为当前节点,有一步是调用waitToBeElectedAsMaster方法,等待当前节点被推举为真正的Master节点,借助waitToBeElectedAsMaster方法看一下投票与得票的实现。

NodeJoinController

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
public class NodeJoinController extends AbstractComponent {

public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
// 创建一个计数器
final CountDownLatch done = new CountDownLatch(1);
// 创建回调函数
final ElectionCallback wrapperCallback = new ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
// 选举成功,计数器减1
done.countDown();
callback.onElectedAsMaster(state);
}

@Override
public void onFailure(Throwable t) {
// 如果选举失败,计数器减1
done.countDown();
callback.onFailure(t);
}
};

ElectionContext myElectionContext = null;

try {
synchronized (this) {
// 判断ElectionContext是否为空
assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";
myElectionContext = electionContext;
// 设置回调函数和法定人数,为选举做准备
electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback);
// 这里判断是否到达法定人数,如果达到,被选为Master
checkPendingJoinsAndElectIfNeeded();
}

try {
// 选主成功或失败都会将计数器减1,这里等待选举结束(设置了超时时间,如果在这段时间内选举还没有结束,放弃等待)
if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
// callback handles everything
return;
}
} catch (InterruptedException e) {

}
if (logger.isTraceEnabled()) {
// 再次收集投票数量
final int pendingNodes = myElectionContext.getPendingMasterJoinsCount();
logger.trace("timed out waiting to be elected. waited [{}]. pending master node joins [{}]", timeValue, pendingNodes);
}
// 停止选举
failContextIfNeeded(myElectionContext, "timed out waiting to be elected");
} catch (Exception e) {
logger.error("unexpected failure while waiting for incoming joins", e);
if (myElectionContext != null) {
failContextIfNeeded(myElectionContext, "unexpected failure while waiting for pending joins [" + e.getMessage() + "]");
}
}
}

/**
* 检查发送请求的数量(投票数)是否达到成为Master的条件
*/
private synchronized void checkPendingJoinsAndElectIfNeeded() {
assert electionContext != null : "election check requested but no active context";
// 获取投票数
final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
// 判断投票数是否达到法定人数
if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
if (logger.isTraceEnabled()) {
logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
electionContext.requiredMasterJoins);
}
} else {// 如果达到了法定人数
if (logger.isTraceEnabled()) {
logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
electionContext.requiredMasterJoins);
}
// 成为Master节点
electionContext.closeAndBecomeMaster();
electionContext = null; // clear this out so future joins won't be accumulated
}
}

// 选举为Master
public synchronized void closeAndBecomeMaster() {
assert callback != null : "becoming a master but the callback is not yet set";
assert isEnoughPendingJoins(getPendingMasterJoinsCount()) : "becoming a master but pending joins of "
+ getPendingMasterJoinsCount() + " are not enough. needs [" + requiredMasterJoins + "];";

innerClose();

Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";

tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
// 这里更新cluster state version
masterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}

// 是否达到法定人数
public synchronized boolean isEnoughPendingJoins(int pendingMasterJoins) {
final boolean hasEnough;
// 如果法定人数小于0
if (requiredMasterJoins < 0) {
hasEnough = false;
} else {
assert callback != null : "requiredMasterJoins is set but not the callback";
// 判断投票数是否达到法定人数
hasEnough = pendingMasterJoins >= requiredMasterJoins;
}
return hasEnough;
}

class ElectionContext {
private ElectionCallback callback = null;
private int requiredMasterJoins = -1;
private final Map<DiscoveryNode, List<MembershipAction.JoinCallback>> joinRequestAccumulator = new HashMap<>();

final AtomicBoolean closed = new AtomicBoolean();

public synchronized void onAttemptToBeElected(int requiredMasterJoins, ElectionCallback callback) {
ensureOpen();
assert this.requiredMasterJoins < 0;
assert this.callback == null;
this.requiredMasterJoins = requiredMasterJoins;
this.callback = callback;
}

......

// 获取投票数
public synchronized int getPendingMasterJoinsCount() {
int pendingMasterJoins = 0;
// 节点收到的join连接被存储在joinRequestAccumulator,遍历joinRequestAccumulator.
for (DiscoveryNode node : joinRequestAccumulator.keySet()) {
// 如果发送请求的节点具有master节点资格
if (node.isMasterNode()) {
// 投票数+1
pendingMasterJoins++;
}
}
return pendingMasterJoins;
}

......
}
}

参考:

Elasticsearch源码解析与优化实战【张超】

Elasticsearch分布式一致性原理剖析(一)-节点篇

Elasticsearch版本:6.1.2