【Elasticsearch】写入流程

Elasticsearch写入分为单个文档的写入和批量写入,单个文档的写入称作Index请求,批量写入称为Bulk请求,他们具有相同的处理逻辑,请求被统一封装为BuklRequest。

单个文档的写入流程:

1.客户端向Node1发送写入请求

2.Node1使用文档的id确定文档属于分片0,因为分片0的主分片不在当前节点,此时Node1充当协调节点,通过集群状态中的路由表可知分片0的主分片位于Node3,请求将会被转发到Node3

3.Node3上的主分片执行写入操作(需要有足够的活跃分片数),如果写入成功,将请求转发到Node1和Node2上,因为这两个节点都有分片0的副本分片,Node3等待所有副本分片的响应结果,当所有的分片都写入成功时,Node3向协调节点发送响应,再由协调节点Node1向客户端报告写入成功

1.活跃的分片数

1
2
3
4
public final Request waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return (Request) this;
}

ES写入一致性的默认策略是quorum,即多数的分片在写入操作时处于可用状态,计算方式如下:
quorum = int((primary + number_of_replicas)/ 2)+ 1

primary是主分片数量,number_of_replicas是副本分片数量,假设一个索引,每个主分片有3个副本分片,那么quorum =(1+3)/2+1 = 3,往这个索引的主分片写入数据时,活跃的分片数至少为3才能写入成功,如果有两个副本分片失效,活跃分片数为1个主分片和1个副本分片。此时活跃分片数为2,那么此时就不允许向此分片写入数据。

可以通过配置文件index.write.wait_for_active_shards设置活跃分片数,也可以在请求中添加参数,默认是1,只要主分片可以即可写入。

详细信息可参考官方文档:

https://www.elastic.co/guide/en/elasticsearch/reference/6.1/docs-index_.html#index-wait-for-active-shards

2.路由算法

一般情况下,路由计算方式如下:

shard_num = hash(_routing) % num_primary_shards

_routing:默认情况下就是文档的id

num_primary_shards:主分片的数量

协调节点流程

  • 参数检查,遇到异常拒绝当前请求
  • 写入时,如果索引未创建,自动创建对应的index,具体在TransportBulkAction的doExecute方法中
  • 协调节点开始处理请求,入口为TransportBulkAction的executeBulk方法,之后进入TransportBulkAction的内部类BulkOperation中的doRun方法
  • doRun方法中,检查集群状态,如果集群有异常如Master节点不存在,写入请求会阻塞等待Master节点.
  • 从集群中获取集群的元数据信息metaData.
  • 遍历bulkRequest,从请求中获取每一个索引concreteIndex,判断请求类型,如果是写入操作,将请求转为IndexRequest,根据concreteIndex从metaData中获取该索引的元数据,之后获取索引的Mapping、创建版本号,并检查Mapping、ID等信息,如果ID为空将自动生成ID.
  • 再次遍历bulkRequest,将每一个请求重新封装为基于shard的请求列表,这么做是为了将路由到同一个分片的所有请求封装到同一个Request中。在这个过程中,会使用路由算法计算文档应该存储到哪个分片上,得到分片的ID,然后将写入到同一个分片的请求合并到同一个shardRequests中.
  • 遍历requestsByShard,里面记录了每个分片对应的所有写入请求,然后将分片请求封装为BulkShardRequest,等待有足够活跃的分片数,之后向分片执行请求,并在listener中等待响应,每个响应也是以shard为单位的,响应信息被设置到Response中.
  • 向分片转发请求具体的实现位于TransportReplicationAction中,转发前同样获取最新的集群状态,根据集群状态中的内容路由表找到目的shard对应主分片所在的节点,如果主分片在当前节点中,直接在本地执行,否则转发到相应的节点中执行.

TransportBulkAction

TransportBulkAction中BulkOperation的doRun方法解析用户请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
public class TransportBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
......

if (needToCheck()) {
// 获取请求中所有的索引
final Set<String> indices = bulkRequest.requests.stream()
.filter(request -> request.opType() != DocWriteRequest.OpType.DELETE
|| request.versionType() == VersionType.EXTERNAL
|| request.versionType() == VersionType.EXTERNAL_GTE)
.map(DocWriteRequest::index)
.collect(Collectors.toSet());
final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
Set<String> autoCreateIndices = new HashSet<>();
ClusterState state = clusterService.state();
// 遍历所有的索引
for (String index : indices) {
boolean shouldAutoCreate;
try {
// 是否自动创建索引
shouldAutoCreate = shouldAutoCreate(index, state);
} catch (IndexNotFoundException e) {
shouldAutoCreate = false;
indicesThatCannotBeCreated.put(index, e);
}
if (shouldAutoCreate) {
// 自动创建索引
autoCreateIndices.add(index);
}
}

......

} else {
executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap());
}
}
private final class BulkOperation extends AbstractRunnable {
// 处理请求
protected void doRun() throws Exception {
// 检查集群状态,如果master节点不存在,会阻塞等待Master节点,甚至超时
final ClusterState clusterState = observer.setAndGetObservedState();
if (handleBlockExceptions(clusterState)) {
return;
}
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
// 获取集群元数据
MetaData metaData = clusterState.metaData();
// 遍历请求
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest docWriteRequest = bulkRequest.requests.get(i);
//the request can only be null because we set it to null in the previous step, so it gets ignored
if (docWriteRequest == null) {
continue;
}
if (addFailureIfIndexIsUnavailable(docWriteRequest, i, concreteIndices, metaData)) {
continue;
}
// 获取索引信息
Index concreteIndex = concreteIndices.resolveIfAbsent(docWriteRequest);
try {
// 判断操作类型
switch (docWriteRequest.opType()) {
case CREATE:
case INDEX://写入操作
// 转为IndexReqeust
IndexRequest indexRequest = (IndexRequest) docWriteRequest;
// 获取索引的元数据
final IndexMetaData indexMetaData = metaData.index(concreteIndex);
// 获取mapping的元数据
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
// 获取创建版本号
Version indexCreated = indexMetaData.getCreationVersion();
// 处理路由
indexRequest.resolveRouting(metaData);
// 检查mapping、id等信息
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
break;
case UPDATE://更新操作
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
break;
case DELETE://删除操作
docWriteRequest.routing(metaData.resolveIndexRouting(docWriteRequest.parent(), docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
}
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(i, null);
}
}

// 将用户的请求重新封装为基于shard的请求列表
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest request = bulkRequest.requests.get(i);
if (request == null) {
continue;
}
// 获取索引
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
// 根据路由算法计算文档应该存储到哪个分片上,得到分片的ID
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
// 根据分片id判断是否已经存入到requestsByShard中,如果存在就取出放入shardRequests,如果不存在将当前的分片ID作为Key,创建一个空的ArrayList作为value
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
// 当前封装的请求添加到shardRequests中,为了将路由到同一个分片的文档封装到一个shardRequest中
shardRequests.add(new BulkItemRequest(i, request));
}
// 如果requestsByShard为空
if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
return;
}

final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
// 当前节点的id
String nodeId = clusterService.localNode().getId();
// 遍历requestsByShard
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
// 获取分片ID
final ShardId shardId = entry.getKey();
// 获取该分片ID对应的所有请求
final List<BulkItemRequest> requests = entry.getValue();
// 将请求封装为BulkShardRequest
BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()]));
// 等待有足够活跃的分片数
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
if (task != null) {
bulkShardRequest.setParentTask(nodeId, task.getId());
}
// 向分片执行请求,在listener中等待响应
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {
@Override
public void onResponse(BulkShardResponse bulkShardResponse) {
for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) {
// we may have no response if item failed
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

@Override
public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest docWriteRequest = request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
}
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
}
});
}
}
}
}

TransportReplicationAction

TransportReplicationAction的内部类ReroutePhase中的doRun方法向分片转发请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public abstract class TransportReplicationAction<
Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse
> extends TransportAction<Request, Response> {

final class ReroutePhase extends AbstractRunnable {
@Override
protected void doRun() {
setPhase(task, "routing");
// 获取集群状态
final ClusterState state = observer.setAndGetObservedState();
if (handleBlockExceptions(state)) {
return;
}

// 获取请求中的索引
final String concreteIndex = concreteIndex(state);
// 获取索引元数据
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
// 如果元数据为空
if (indexMetaData == null) {
retry(new IndexNotFoundException(concreteIndex));
return;
}
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
throw new IndexClosedException(indexMetaData.getIndex());
}

// 等待足够活跃的分片数
resolveRequest(indexMetaData, request);
assert request.shardId() != null : "request shardId must be set in resolveRequest";
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
// 获取主分片所在节点
final ShardRouting primary = primary(state);
if (retryIfUnavailable(state, primary)) {
return;
}
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
// 如果主分片在本机节点,在本地执行
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
performLocalAction(state, primary, node, indexMetaData);
} else {
// 转发到主分片所在节点
performRemoteAction(state, primary, node);
}
}

}

}

主分片节点流程

主分片所在的节点收到协调节点发送的请求后,开始执行写入操作,入口在ReplicationOperation中的execute方法中,写入成功后转发到副本分片,等待响应并回复协调节点。

  • 主分片在收到协调节点发送的请求后首先也做校验工作,检验是否是主分片,索引是否处于关闭状态等
  • 判断请求是否需要延迟执行
  • 判断主分片是否已经发生迁移
  • 检测活跃的shard数量是否足够,只要主分片可用就执行写入
  • 具体写入的入口在ReplicationOperation中的execute方法
  • 接下来进入TransportShardBulkAction的performOnPrimary方法(中间省略了一些步骤),继续省略一些调用过程,最后进入到InternalEngine的index方法,在这里将数据写入Lucene
  • 写入Lucene后将写入操作添加到translog,如果Lucene写入失败,需要对translog进行回滚
  • 根据配置的translog flush策略进行刷盘控制
  • 主分片写入完毕后转发给副本分片,副本分片执行写入操作,主分片等待副本分片的响应
  • 收到副本分片的全部响应后,执行finish,向协调节点返回消息,告之成功与失败的操作
  • 副本分片写入失败时,主分片所在节点将发送一个shardFaild请求给Master

ReplicationOperation

ReplicationOperation的execute方法是主分片写入操作的入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
PrimaryResultT extends ReplicationOperation.PrimaryResult<ReplicaRequest>
> {

// 主分片写入操作入口
public void execute() throws Exception {
// 检查活跃分片数量
final String activeShardCountFailure = checkActiveShardCount();
final ShardRouting primaryRouting = primary.routingEntry();
final ShardId primaryId = primaryRouting.shardId();
if (activeShardCountFailure != null) {
finishAsFailed(new UnavailableShardsException(primaryId,
"{} Timeout: [{}], request: [{}]", activeShardCountFailure, request.timeout(), request));
return;
}

totalShards.incrementAndGet();
pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination
// 开始执行写入操作
primaryResult = primary.perform(request);
// 更新分片的check point
primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint());
// 获取副本请求
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request);
}
// 全局check point
final long globalCheckpoint = primary.globalCheckpoint();
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable());
// 转发请求到副本分片
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable());
}

successfulShards.incrementAndGet(); // mark primary as successful
decPendingAndFinishIfNeeded();
}
}

TransportShardBulkAction

TransportShardBulkAction的performOnPrimary方法中主分片执行写入操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
@Override
public WritePrimaryResult<BulkShardRequest, BulkShardResponse> shardOperationOnPrimary(
BulkShardRequest request, IndexShard primary) throws Exception {
return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer());
}

// 主分片写入操作
public static WritePrimaryResult<BulkShardRequest, BulkShardResponse> performOnPrimary(
BulkShardRequest request,
IndexShard primary,
UpdateHelper updateHelper,
LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater) throws Exception {
// 获取索引元数据
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
Translog.Location location = null;
// 遍历请求
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
if (isAborted(request.items()[requestIndex].getPrimaryResponse()) == false) {
// 执行具体的写入请求
location = executeBulkItemRequest(metaData, primary, request, location, requestIndex,
updateHelper, nowInMillisSupplier, mappingUpdater);
}
}
// 创建响应
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
BulkItemRequest[] items = request.items();
for (int i = 0; i < items.length; i++) {
responses[i] = items[i].getPrimaryResponse();
}
BulkShardResponse response = new BulkShardResponse(request.shardId(), responses);
return new WritePrimaryResult<>(request, response, location, null, primary, logger);
}
}

由于方法调用栈比较深,省略中间方法,直接看InternalEngine的index方法,省略的流程如下:

executeBulkItemRequest(TransportShardBulkAction)—>
executeIndexRequest(TransportShardBulkAction)—>
executeIndexRequestOnPrimary(TransportShardBulkAction)—>
applyIndexOperationOnPrimary(IndexShard)—>
applyIndexOperation(IndexShard)—>
index(IndexShard)—>
index(InternalEngine)

InternalEngine

InternalEngine的index方法将数据写入Luence,之后将写入操作添加到translog:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class InternalEngine extends Engine {
@Override
public IndexResult index(Index index) throws IOException {
assert Objects.equals(index.uid().field(), uidField) : index.uid().field();
final boolean doThrottle = index.origin().isRecovery() == false;
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
assert assertVersionType(index);
try (Releasable ignored = acquireLock(index.uid());

......

final IndexResult indexResult;
if (plan.earlyResultOnPreFlightError.isPresent()) {
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.hasFailure();
} else if (plan.indexIntoLucene) {
// 调用Lucene的写入接口将文档写入Lucene
indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
}
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
if (indexResult.hasFailure() == false) {
// 将整个写入操作加入到translog
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
......

return indexResult;
}
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}
}

GlobalCheckpointSyncAction

GlobalCheckpointSyncAction的maybeSyncTranslog方法进行刷盘控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class GlobalCheckpointSyncAction extends TransportReplicationAction<
GlobalCheckpointSyncAction.Request,
GlobalCheckpointSyncAction.Request,
ReplicationResponse> {

private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
final Translog translog = indexShard.getTranslog();
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
indexShard.getTranslog().sync();
}
}
}

副本分片节点流程

副本分片的写入流程与主分片基本相同,写入完毕后向主分片发送响应。

参考:

Elasticsearch源码解析与优化实战【张超】

Elasticsearch源码分析-写入解析

Elasticsearch分布式一致性原理剖析(三)-Data篇

Elasticsearch版本:6.1.2