Extract non-transport primary logic from TransportReplicationAction #16492

Extracts all the replication logic that is done on the Primary to a separated class called ReplicationOperation. The goal
here is to make unit testing of this logic easier and in the future allow setting up tests that work directly on IndexShards
without the need for networking.

Closes #16492
This commit is contained in:
Boaz Leskes 2016-02-06 11:15:24 +01:00
parent 7a2b923ad1
commit 7cd128b372
19 changed files with 1368 additions and 1429 deletions

View File

@ -233,7 +233,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]ReplicationRequest.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]ReplicationRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]TransportBroadcastReplicationAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]TransportReplicationAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]instance[/\\]InstanceShardOperationRequestBuilder.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]instance[/\\]TransportInstanceSingleOperationAction.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]shard[/\\]SingleShardOperationRequestBuilder.java" checks="LineLength" />
@ -849,7 +848,6 @@
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]TransportMasterNodeActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]BroadcastReplicationTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]TransportReplicationActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]instance[/\\]TransportInstanceSingleOperationActionTests.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]AbstractTermVectorsTestCase.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]GetTermVectorsCheckDocFreqIT.java" checks="LineLength" />

View File

@ -19,6 +19,7 @@
package org.elasticsearch;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -696,8 +697,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
org.elasticsearch.index.translog.TranslogException::new, 115),
PROCESS_CLUSTER_EVENT_TIMEOUT_EXCEPTION(org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException.class,
org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException::new, 116),
RETRY_ON_PRIMARY_EXCEPTION(org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnPrimaryException.class,
org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnPrimaryException::new, 117),
RETRY_ON_PRIMARY_EXCEPTION(ReplicationOperation.RetryOnPrimaryException.class,
ReplicationOperation.RetryOnPrimaryException::new, 117),
ELASTICSEARCH_TIMEOUT_EXCEPTION(org.elasticsearch.ElasticsearchTimeoutException.class,
org.elasticsearch.ElasticsearchTimeoutException::new, 118),
QUERY_PHASE_EXECUTION_EXCEPTION(org.elasticsearch.search.query.QueryPhaseExecutionException.class,

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
@ -56,7 +55,7 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
}
@Override
protected Tuple<ReplicationResponse, ShardFlushRequest> shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) {
protected Tuple<ReplicationResponse, ShardFlushRequest> shardOperationOnPrimary(ShardFlushRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
indexShard.flush(shardRequest.getRequest());
logger.trace("{} flush request executed on primary", indexShard.shardId());

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
@ -58,7 +57,7 @@ public class TransportShardRefreshAction extends TransportReplicationAction<Basi
}
@Override
protected Tuple<ReplicationResponse, BasicReplicationRequest> shardOperationOnPrimary(MetaData metaData, BasicReplicationRequest shardRequest) {
protected Tuple<ReplicationResponse, BasicReplicationRequest> shardOperationOnPrimary(BasicReplicationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
indexShard.refresh("api");
logger.trace("{} refresh request executed on primary", indexShard.shardId());

View File

@ -261,7 +261,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
}
try {
indexRequest.process(metaData, mappingMd, allowIdGeneration, concreteIndex.getName());
indexRequest.resolveRouting(metaData);
indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
} catch (ElasticsearchParseException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), indexRequest.type(), indexRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, "index", failure);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.delete.TransportDeleteAction;
@ -37,9 +36,9 @@ import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -62,6 +61,9 @@ import org.elasticsearch.transport.TransportService;
import java.util.Map;
import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException;
import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException;
/**
* Performs the index operation.
*/
@ -105,10 +107,11 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
@Override
protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(MetaData metaData, BulkShardRequest request) {
protected Tuple<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(BulkShardRequest request) {
ShardId shardId = request.shardId();
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.getId());
final IndexMetaData metaData = indexService.getIndexSettings().getIndexMetaData();
long[] preVersions = new long[request.items().length];
VersionType[] preVersionTypes = new VersionType[request.items().length];
@ -127,7 +130,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
return new Tuple<>(new BulkShardResponse(request.shardId(), responses), request);
}
private Translog.Location handleItem(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
private Translog.Location handleItem(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
if (item.request() instanceof IndexRequest) {
location = index(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
} else if (item.request() instanceof DeleteRequest) {
@ -145,7 +148,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
return location;
}
private Translog.Location index(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
private Translog.Location index(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
IndexRequest indexRequest = (IndexRequest) item.request();
preVersions[requestIndex] = indexRequest.version();
preVersionTypes[requestIndex] = indexRequest.versionType();
@ -220,7 +223,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
return location;
}
private Tuple<Translog.Location, BulkItemRequest> update(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
private Tuple<Translog.Location, BulkItemRequest> update(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) {
UpdateRequest updateRequest = (UpdateRequest) item.request();
preVersions[requestIndex] = updateRequest.version();
preVersionTypes[requestIndex] = updateRequest.versionType();
@ -326,19 +329,12 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
}
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, MetaData metaData,
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, IndexMetaData metaData,
IndexShard indexShard, boolean processed) throws Throwable {
// validate, if routing is required, that we got routing
MappingMetaData mappingMd = metaData.index(request.index()).mappingOrDefault(indexRequest.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (indexRequest.routing() == null) {
throw new RoutingMissingException(request.index(), indexRequest.type(), indexRequest.id());
}
}
MappingMetaData mappingMd = metaData.mappingOrDefault(indexRequest.type());
if (!processed) {
indexRequest.process(metaData, mappingMd, allowIdGeneration, request.index());
indexRequest.process(mappingMd, allowIdGeneration, request.index());
}
return TransportIndexAction.executeIndexRequestOnPrimary(indexRequest, indexShard, mappingUpdatedAction);
}
@ -396,7 +392,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
}
private UpdateResult shardUpdateOperation(MetaData metaData, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
private UpdateResult shardUpdateOperation(IndexMetaData metaData, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
switch (translate.operation()) {
case UPSERT:

View File

@ -30,14 +30,13 @@ import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
@ -63,7 +62,7 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
AutoCreateIndex autoCreateIndex) {
super(settings, DeleteAction.NAME, transportService, clusterService, indicesService, threadPool, shardStateAction,
actionFilters, indexNameExpressionResolver,
DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX);
DeleteRequest::new, DeleteRequest::new, ThreadPool.Names.INDEX);
this.createIndexAction = createIndexAction;
this.autoCreateIndex = autoCreateIndex;
}
@ -94,27 +93,19 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
}
@Override
protected void resolveRequest(final MetaData metaData, String concreteIndex, DeleteRequest request) {
resolveAndValidateRouting(metaData, concreteIndex, request);
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), concreteIndex, request.id(), request.routing());
protected void resolveRequest(final MetaData metaData, IndexMetaData indexMetaData, DeleteRequest request) {
resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request);
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
indexMetaData.getIndex().getName(), request.id(), request.routing());
request.setShardId(shardId);
}
public static void resolveAndValidateRouting(final MetaData metaData, String concreteIndex, DeleteRequest request) {
public static void resolveAndValidateRouting(final MetaData metaData, final String concreteIndex,
DeleteRequest request) {
request.routing(metaData.resolveIndexRouting(request.parent(), request.routing(), request.index()));
if (metaData.hasIndex(concreteIndex)) {
// check if routing is required, if so, throw error if routing wasn't specified
MappingMetaData mappingMd = metaData.index(concreteIndex).mappingOrDefault(request.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (request.routing() == null) {
if (request.versionType() != VersionType.INTERNAL) {
// TODO: implement this feature
throw new IllegalArgumentException("routing value is required for deleting documents of type [" + request.type()
+ "] while using version_type [" + request.versionType() + "]");
}
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
}
}
// check if routing is required, if so, throw error if routing wasn't specified
if (request.routing() == null && metaData.routingRequired(concreteIndex, request.type())) {
throw new RoutingMissingException(concreteIndex, request.type(), request.id());
}
}
@ -128,7 +119,7 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
}
@Override
protected Tuple<DeleteResponse, DeleteRequest> shardOperationOnPrimary(MetaData metaData, DeleteRequest request) {
protected Tuple<DeleteResponse, DeleteRequest> shardOperationOnPrimary(DeleteRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
final WriteResult<DeleteResponse> result = executeDeleteRequestOnPrimary(request, indexShard);
processAfterWrite(request.refresh(), indexShard, result.location);

View File

@ -582,10 +582,7 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
}
public void process(MetaData metaData, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration, String concreteIndex) {
// resolve the routing if needed
routing(metaData.resolveIndexRouting(parent, routing, index));
public void process(@Nullable MappingMetaData mappingMd, boolean allowIdGeneration, String concreteIndex) {
// resolve timestamp if provided externally
if (timestamp != null) {
timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp,
@ -638,6 +635,11 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
}
}
/* resolve the routing if needed */
public void resolveRouting(MetaData metaData) {
routing(metaData.resolveIndexRouting(parent, routing, index));
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -21,12 +21,12 @@ package org.elasticsearch.action.index;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
@ -122,13 +122,12 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
}
@Override
protected void resolveRequest(MetaData metaData, String concreteIndex, IndexRequest request) {
MappingMetaData mappingMd = null;
if (metaData.hasIndex(concreteIndex)) {
mappingMd = metaData.index(concreteIndex).mappingOrDefault(request.type());
}
request.process(metaData, mappingMd, allowIdGeneration, concreteIndex);
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), concreteIndex, request.id(), request.routing());
protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) {
MappingMetaData mappingMd =indexMetaData.mappingOrDefault(request.type());
request.resolveRouting(metaData);
request.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName());
ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(),
indexMetaData.getIndex().getName(), request.id(), request.routing());
request.setShardId(shardId);
}
@ -142,16 +141,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
}
@Override
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Exception {
// validate, if routing is required, that we got routing
IndexMetaData indexMetaData = metaData.getIndexSafe(request.shardId().getIndex());
MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (request.routing() == null) {
throw new RoutingMissingException(request.shardId().getIndex().getName(), request.type(), request.id());
}
}
protected Tuple<IndexResponse, IndexRequest> shardOperationOnPrimary(IndexRequest request) throws Exception {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().id());
@ -200,7 +190,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
/**
* Execute the given {@link IndexRequest} on a primary shard, throwing a
* {@link RetryOnPrimaryException} if the operation needs to be re-tried.
* {@link ReplicationOperation.RetryOnPrimaryException} if the operation needs to be re-tried.
*/
public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Exception {
Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard);
@ -211,7 +201,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
operation = prepareIndexOperationOnPrimary(request, indexShard);
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
throw new RetryOnPrimaryException(shardId,
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
"Dynamic mappings are not available on the node that holds the primary yet");
}
}

View File

@ -0,0 +1,348 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
public class ReplicationOperation<Request extends ReplicationRequest<Request>, ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse> {
final private ESLogger logger;
final private Request request;
final private Supplier<ClusterState> clusterStateSupplier;
final private String opType;
final private AtomicInteger totalShards = new AtomicInteger();
final private AtomicInteger pendingShards = new AtomicInteger();
final private AtomicInteger successfulShards = new AtomicInteger();
final private boolean executeOnReplicas;
final private boolean checkWriteConsistency;
final private Primary<Request, ReplicaRequest, Response> primary;
final private Replicas<ReplicaRequest> replicasProxy;
final private AtomicBoolean finished = new AtomicBoolean();
final protected ActionListener<Response> finalResponseListener;
private volatile Response finalResponse = null;
private final List<ReplicationResponse.ShardInfo.Failure> shardReplicaFailures = Collections.synchronizedList(new ArrayList<>());
ReplicationOperation(Request request, Primary<Request, ReplicaRequest, Response> primary,
ActionListener<Response> listener,
boolean executeOnReplicas, boolean checkWriteConsistency,
Replicas<ReplicaRequest> replicas,
Supplier<ClusterState> clusterStateSupplier, ESLogger logger, String opType) {
this.checkWriteConsistency = checkWriteConsistency;
this.executeOnReplicas = executeOnReplicas;
this.replicasProxy = replicas;
this.primary = primary;
this.finalResponseListener = listener;
this.logger = logger;
this.request = request;
this.clusterStateSupplier = clusterStateSupplier;
this.opType = opType;
}
void execute() throws Exception {
final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null;
final ShardId shardId = primary.routingEntry().shardId();
if (writeConsistencyFailure != null) {
finishAsFailed(new UnavailableShardsException(shardId,
"{} Timeout: [{}], request: [{}]", writeConsistencyFailure, request.timeout(), request));
return;
}
totalShards.incrementAndGet();
pendingShards.incrementAndGet(); // increase by 1 until we finish all primary coordination
Tuple<Response, ReplicaRequest> primaryResponse = primary.perform(request);
successfulShards.incrementAndGet(); // mark primary as successful
finalResponse = primaryResponse.v1();
ReplicaRequest replicaRequest = primaryResponse.v2();
assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term";
if (logger.isTraceEnabled()) {
logger.trace("[{}] op [{}] completed on primary for request [{}]", shardId, opType, request);
}
// we have to get a new state after successfully indexing into the primary in order to honour recovery semantics.
// we have to make sure that every operation indexed into the primary after recovery start will also be replicated
// to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then.
// If the index gets deleted after primary operation, we skip replication
List<ShardRouting> shards = getShards(shardId, clusterStateSupplier.get());
final String localNodeId = primary.routingEntry().currentNodeId();
for (final ShardRouting shard : shards) {
if (executeOnReplicas == false || shard.unassigned()) {
if (shard.primary() == false) {
totalShards.incrementAndGet();
}
continue;
}
if (shard.currentNodeId().equals(localNodeId) == false) {
performOnReplica(shard, replicaRequest);
}
if (shard.relocating() && shard.relocatingNodeId().equals(localNodeId) == false) {
performOnReplica(shard.buildTargetRelocatingShard(), replicaRequest);
}
}
// decrement pending and finish (if there are no replicas, or those are done)
decPendingAndFinishIfNeeded(); // incremented in the beginning of this method
}
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest) {
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
}
totalShards.incrementAndGet();
pendingShards.incrementAndGet();
replicasProxy.performOn(shard, replicaRequest, new ActionListener<TransportResponse.Empty>() {
@Override
public void onResponse(TransportResponse.Empty empty) {
successfulShards.incrementAndGet();
decPendingAndFinishIfNeeded();
}
@Override
public void onFailure(Throwable replicaException) {
logger.trace("[{}] failure while performing [{}] on replica {}, request [{}]", replicaException, shard.shardId(), opType,
shard, replicaRequest);
if (ignoreReplicaException(replicaException)) {
decPendingAndFinishIfNeeded();
} else {
RestStatus restStatus = ExceptionsHelper.status(replicaException);
shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
logger.warn("[{}] {}", replicaException, shard.shardId(), message);
replicasProxy.failShard(shard, primary.routingEntry(), message, replicaException,
ReplicationOperation.this::decPendingAndFinishIfNeeded,
ReplicationOperation.this::onPrimaryDemoted,
throwable -> decPendingAndFinishIfNeeded()
);
}
}
});
}
private void onPrimaryDemoted(Throwable demotionFailure) {
String primaryFail = String.format(Locale.ROOT,
"primary shard [%s] was demoted while failing replica shard",
primary.routingEntry());
// we are no longer the primary, fail ourselves and start over
primary.failShard(primaryFail, demotionFailure);
finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure));
}
/**
* checks whether we can perform a write based on the write consistency setting
* returns **null* if OK to proceed, or a string describing the reason to stop
*/
String checkWriteConsistency() {
assert request.consistencyLevel() != WriteConsistencyLevel.DEFAULT : "consistency level should be set";
final ShardId shardId = primary.routingEntry().shardId();
final ClusterState state = clusterStateSupplier.get();
final WriteConsistencyLevel consistencyLevel = request.consistencyLevel();
final int sizeActive;
final int requiredNumber;
IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndexName());
if (indexRoutingTable != null) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId());
if (shardRoutingTable != null) {
sizeActive = shardRoutingTable.activeShards().size();
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) {
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica,
// quorum is 1 (which is what it is initialized to)
requiredNumber = (shardRoutingTable.getSize() / 2) + 1;
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardRoutingTable.getSize();
} else {
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
}
} else {
sizeActive = 0;
requiredNumber = 1;
}
if (sizeActive < requiredNumber) {
logger.trace("[{}] not enough active copies to meet write consistency of [{}] (have {}, needed {}), scheduling a retry." +
" op [{}], request [{}]", shardId, consistencyLevel, sizeActive, requiredNumber, opType, request);
return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed "
+ requiredNumber + ").";
} else {
return null;
}
}
protected List<ShardRouting> getShards(ShardId shardId, ClusterState state) {
// can be null if the index is deleted / closed on us..
final IndexShardRoutingTable shardRoutingTable = state.getRoutingTable().shardRoutingTableOrNull(shardId);
List<ShardRouting> shards = shardRoutingTable == null ? Collections.emptyList() : shardRoutingTable.shards();
return shards;
}
private void decPendingAndFinishIfNeeded() {
assert pendingShards.get() > 0;
if (pendingShards.decrementAndGet() == 0) {
finish();
}
}
private void finish() {
if (finished.compareAndSet(false, true)) {
final ReplicationResponse.ShardInfo.Failure[] failuresArray;
if (shardReplicaFailures.isEmpty()) {
failuresArray = ReplicationResponse.EMPTY;
} else {
failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()];
shardReplicaFailures.toArray(failuresArray);
}
finalResponse.setShardInfo(new ReplicationResponse.ShardInfo(
totalShards.get(),
successfulShards.get(),
failuresArray
)
);
finalResponseListener.onResponse(finalResponse);
}
}
private void finishAsFailed(Throwable throwable) {
if (finished.compareAndSet(false, true)) {
finalResponseListener.onFailure(throwable);
}
}
/**
* Should an exception be ignored when the operation is performed on the replica.
*/
public static boolean ignoreReplicaException(Throwable e) {
if (TransportActions.isShardNotAvailableException(e)) {
return true;
}
// on version conflict or document missing, it means
// that a new change has crept into the replica, and it's fine
if (isConflictException(e)) {
return true;
}
return false;
}
public static boolean isConflictException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
// on version conflict or document missing, it means
// that a new change has crept into the replica, and it's fine
if (cause instanceof VersionConflictEngineException) {
return true;
}
return false;
}
interface Primary<Request extends ReplicationRequest<Request>, ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse> {
/** routing entry for this primary */
ShardRouting routingEntry();
/** fail the primary, typically due to the fact that the operation has learned the primary has been demoted by the master */
void failShard(String message, Throwable throwable);
/**
* Performs the given request on this primary
*
* @return A tuple containing not null values, as first value the result of the primary operation and as second value
* the request to be executed on the replica shards.
*/
Tuple<Response, ReplicaRequest> perform(Request request) throws Exception;
}
interface Replicas<ReplicaRequest extends ReplicationRequest<ReplicaRequest>> {
/**
* performs the the given request on the specified replica
*
* @param replica {@link ShardRouting} of the shard this request should be executed on
* @param replicaRequest operation to peform
* @param listener a callback to call once the operation has been complicated, either successfully or with an error.
*/
void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener<TransportResponse.Empty> listener);
/**
* Fail the specified shard, removing it from the current set of active shards
* @param replica shard to fail
* @param primary the primary shard that requested the failure
* @param message a (short) description of the reason
* @param throwable the original exception which caused the ReplicationOperation to request the shard to be failed
* @param onSuccess a callback to call when the shard has been successfully removed from the active set.
* @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted
* by the master.
* @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the
* replication operation can finish processing
* Note: this callback should be used in extreme situations, typically node shutdown.
*/
void failShard(ShardRouting replica, ShardRouting primary, String message, Throwable throwable, Runnable onSuccess,
Consumer<Throwable> onPrimaryDemoted, Consumer<Throwable> onIgnoredFailure);
}
public static class RetryOnPrimaryException extends ElasticsearchException {
public RetryOnPrimaryException(ShardId shardId, String msg) {
this(shardId, msg, null);
}
public RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) {
super(msg, cause);
setShard(shardId);
}
public RetryOnPrimaryException(StreamInput in) throws IOException {
super(in);
}
}
}

View File

@ -313,7 +313,10 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
/**
* The term of the current selected primary. This is a non-negative number incremented when
* a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary
* a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary.
*
* Note: since we increment the term every time a shard is assigned, the term for any operational shard (i.e., a shard
* that can be indexed into) is larger than 0.
* See {@link AllocationService#updateMetaDataWithRoutingTable(MetaData, RoutingTable, RoutingTable)}.
**/
public long primaryTerm(int shardId) {

View File

@ -28,46 +28,24 @@ import java.util.function.Supplier;
/**
* Base class for delegating transport response to a transport channel
*/
public abstract class TransportChannelResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
/**
* Convenience method for delegating an empty response to the provided transport channel
*/
public static TransportChannelResponseHandler<TransportResponse.Empty> emptyResponseHandler(ESLogger logger,
TransportChannel channel,
String extraInfoOnError) {
return new TransportChannelResponseHandler<TransportResponse.Empty>(logger, channel, extraInfoOnError) {
@Override
public TransportResponse.Empty newInstance() {
return TransportResponse.Empty.INSTANCE;
}
};
}
/**
* Convenience method for delegating a response provided by supplier to the provided transport channel
*/
public static <T extends TransportResponse> TransportChannelResponseHandler responseHandler(ESLogger logger,
Supplier<T> responseSupplier,
TransportChannel channel,
String extraInfoOnError) {
return new TransportChannelResponseHandler<T>(logger, channel, extraInfoOnError) {
@Override
public T newInstance() {
return responseSupplier.get();
}
};
}
public class TransportChannelResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
private final ESLogger logger;
private final TransportChannel channel;
private final String extraInfoOnError;
private final Supplier<T> responseSupplier;
protected TransportChannelResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) {
public TransportChannelResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError,
Supplier<T> responseSupplier) {
this.logger = logger;
this.channel = channel;
this.extraInfoOnError = extraInfoOnError;
this.responseSupplier = responseSupplier;
}
@Override
public T newInstance() {
return responseSupplier.get();
}
@Override

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.TimestampParsingException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.client.AbstractClientHeadersTestCase;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -761,7 +762,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(114, org.elasticsearch.index.shard.IndexShardRecoveringException.class);
ids.put(115, org.elasticsearch.index.translog.TranslogException.class);
ids.put(116, org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException.class);
ids.put(117, org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnPrimaryException.class);
ids.put(117, ReplicationOperation.RetryOnPrimaryException.class);
ids.put(118, org.elasticsearch.ElasticsearchTimeoutException.class);
ids.put(119, org.elasticsearch.search.query.QueryPhaseExecutionException.class);
ids.put(120, org.elasticsearch.repositories.RepositoryVerificationException.class);

View File

@ -86,7 +86,7 @@ public class ClusterStateCreationUtils {
}
discoBuilder.localNodeId(newNode(0).getId());
discoBuilder.masterNodeId(newNode(1).getId()); // we need a non-local master to test shard failures
final int primaryTerm = randomInt(200);
final int primaryTerm = 1 + randomInt(200);
IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)

View File

@ -0,0 +1,461 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.IndexShardNotStartedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.TransportResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class ReplicationOperationTests extends ESTestCase {
public void testReplication() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
final long primaryTerm = state.getMetaData().index(index).primaryTerm(0);
final IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().shardRoutingTable(shardId);
ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
if (primaryShard.relocating() && randomBoolean()) {
// simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated
state = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
primaryShard = primaryShard.buildTargetRelocatingShard();
}
final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, state);
final Map<ShardRouting, Throwable> expectedFailures = new HashMap<>();
final Set<ShardRouting> expectedFailedShards = new HashSet<>();
for (ShardRouting replica : expectedReplicas) {
if (randomBoolean()) {
Throwable t;
boolean criticalFailure = randomBoolean();
if (criticalFailure) {
t = new CorruptIndexException("simulated", (String) null);
} else {
t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING);
}
logger.debug("--> simulating failure on {} with [{}]", replica, t.getClass().getSimpleName());
expectedFailures.put(replica, t);
if (criticalFailure) {
expectedFailedShards.add(replica);
}
}
}
Request request = new Request(shardId);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final ClusterState finalState = state;
final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures);
final TestReplicationOperation op = new TestReplicationOperation(request,
new TestPrimary(primaryShard, primaryTerm), listener, replicasProxy, () -> finalState);
op.execute();
assertThat(request.primaryTerm(), equalTo(primaryTerm));
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
assertThat(replicasProxy.failedReplicas, equalTo(expectedFailedShards));
assertTrue("listener is not marked as done", listener.isDone());
Response.ShardInfo shardInfo = listener.actionGet().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(expectedFailedShards.size()));
assertThat(shardInfo.getFailures(), arrayWithSize(expectedFailedShards.size()));
assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - expectedFailures.size()));
final List<ShardRouting> unassignedShards =
indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED);
final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size();
assertThat(shardInfo.getTotal(), equalTo(totalShards));
}
public void testReplicationWithShadowIndex() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final ClusterState state = stateWithActivePrimary(index, true, randomInt(5));
final long primaryTerm = state.getMetaData().index(index).primaryTerm(0);
final IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().shardRoutingTable(shardId);
final ShardRouting primaryShard = indexShardRoutingTable.primaryShard();
Request request = new Request(shardId);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final TestReplicationOperation op = new TestReplicationOperation(request,
new TestPrimary(primaryShard, primaryTerm), listener, false, false,
new TestReplicaProxy(), () -> state, logger, "test");
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertThat(request.processedOnReplicas, equalTo(Collections.emptySet()));
assertTrue("listener is not marked as done", listener.isDone());
Response.ShardInfo shardInfo = listener.actionGet().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(0));
assertThat(shardInfo.getFailures(), arrayWithSize(0));
assertThat(shardInfo.getSuccessful(), equalTo(1));
assertThat(shardInfo.getTotal(), equalTo(indexShardRoutingTable.getSize()));
}
public void testDemotedPrimary() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(2), randomInt(2));
final long primaryTerm = state.getMetaData().index(index).primaryTerm(0);
ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
if (primaryShard.relocating() && randomBoolean()) {
// simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated
state = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build();
primaryShard = primaryShard.buildTargetRelocatingShard();
}
final Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, state);
final Map<ShardRouting, Throwable> expectedFailures = new HashMap<>();
final ShardRouting failedReplica = randomFrom(new ArrayList<>(expectedReplicas));
expectedFailures.put(failedReplica, new CorruptIndexException("simulated", (String) null));
Request request = new Request(shardId);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final ClusterState finalState = state;
final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) {
@Override
public void failShard(ShardRouting replica, ShardRouting primary, String message, Throwable throwable,
Runnable onSuccess, Consumer<Throwable> onPrimaryDemoted,
Consumer<Throwable> onIgnoredFailure) {
assertThat(replica, equalTo(failedReplica));
onPrimaryDemoted.accept(new ElasticsearchException("the king is dead"));
}
};
AtomicBoolean primaryFailed = new AtomicBoolean();
final TestPrimary primary = new TestPrimary(primaryShard, primaryTerm) {
@Override
public void failShard(String message, Throwable throwable) {
assertTrue(primaryFailed.compareAndSet(false, true));
}
};
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy,
() -> finalState);
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
assertTrue("listener is not marked as done", listener.isDone());
assertTrue(primaryFailed.get());
assertListenerThrows("should throw exception to trigger retry", listener,
ReplicationOperation.RetryOnPrimaryException.class);
}
public void testAddedReplicaAfterPrimaryOperation() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final ClusterState initialState = stateWithActivePrimary(index, true, 0);
final ClusterState stateWithAddedReplicas;
if (randomBoolean()) {
stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED,
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED);
} else {
stateWithAddedReplicas = state(index, true, ShardRoutingState.RELOCATING);
}
testClusterStateChangeAfterPrimaryOperation(shardId, initialState, stateWithAddedReplicas);
}
public void testIndexDeletedAfterPrimaryOperation() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final ClusterState initialState = state(index, true, ShardRoutingState.STARTED, ShardRoutingState.STARTED);
final ClusterState stateWithDeletedIndex = state(index + "_new", true, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
testClusterStateChangeAfterPrimaryOperation(shardId, initialState, stateWithDeletedIndex);
}
private void testClusterStateChangeAfterPrimaryOperation(final ShardId shardId,
final ClusterState initialState,
final ClusterState changedState) throws Exception {
AtomicReference<ClusterState> state = new AtomicReference<>(initialState);
logger.debug("--> using initial state:\n{}", state.get().prettyPrint());
final long primaryTerm = initialState.getMetaData().index(shardId.getIndexName()).primaryTerm(shardId.id());
final ShardRouting primaryShard = state.get().routingTable().shardRoutingTable(shardId).primaryShard();
final TestPrimary primary = new TestPrimary(primaryShard, primaryTerm) {
@Override
public Tuple<Response, Request> perform(Request request) throws Exception {
final Tuple<Response, Request> tuple = super.perform(request);
state.set(changedState);
logger.debug("--> state after primary operation:\n{}", state.get().prettyPrint());
return tuple;
}
};
Request request = new Request(shardId);
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener,
new TestReplicaProxy(), state::get);
op.execute();
assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true));
Set<ShardRouting> expectedReplicas = getExpectedReplicas(shardId, state.get());
assertThat(request.processedOnReplicas, equalTo(expectedReplicas));
}
public void testWriteConsistency() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final int assignedReplicas = randomInt(2);
final int unassignedReplicas = randomInt(2);
final int totalShards = 1 + assignedReplicas + unassignedReplicas;
final boolean passesWriteConsistency;
Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values()));
switch (request.consistencyLevel()) {
case ONE:
passesWriteConsistency = true;
break;
case DEFAULT:
case QUORUM:
if (totalShards <= 2) {
passesWriteConsistency = true; // primary is enough
} else {
passesWriteConsistency = assignedReplicas + 1 >= (totalShards / 2) + 1;
}
// we have to reset default (as the transport replication action will do)
request.consistencyLevel(WriteConsistencyLevel.QUORUM);
break;
case ALL:
passesWriteConsistency = unassignedReplicas == 0;
break;
default:
throw new RuntimeException("unknown consistency level [" + request.consistencyLevel() + "]");
}
ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas];
for (int i = 0; i < assignedReplicas; i++) {
replicaStates[i] = randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING);
}
for (int i = assignedReplicas; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.UNASSIGNED;
}
final ClusterState state = state(index, true, ShardRoutingState.STARTED, replicaStates);
logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]." +
" expecting op to [{}]. using state: \n{}",
request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas,
passesWriteConsistency ? "succeed" : "retry",
state.prettyPrint());
final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id());
final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id());
PlainActionFuture<Response> listener = new PlainActionFuture<>();
final ShardRouting primaryShard = shardRoutingTable.primaryShard();
final TestReplicationOperation op = new TestReplicationOperation(request,
new TestPrimary(primaryShard, primaryTerm),
listener, randomBoolean(), true, new TestReplicaProxy(), () -> state, logger, "test");
if (passesWriteConsistency) {
assertThat(op.checkWriteConsistency(), nullValue());
op.execute();
assertTrue("operations should have been performed, consistency level is met",
request.processedOnPrimary.get());
} else {
assertThat(op.checkWriteConsistency(), notNullValue());
op.execute();
assertFalse("operations should not have been perform, consistency level is *NOT* met",
request.processedOnPrimary.get());
assertListenerThrows("should throw exception to trigger retry", listener, UnavailableShardsException.class);
}
}
private Set<ShardRouting> getExpectedReplicas(ShardId shardId, ClusterState state) {
Set<ShardRouting> expectedReplicas = new HashSet<>();
String localNodeId = state.nodes().getLocalNodeId();
if (state.routingTable().hasIndex(shardId.getIndexName())) {
for (ShardRouting shardRouting : state.routingTable().shardRoutingTable(shardId)) {
if (shardRouting.unassigned()) {
continue;
}
if (localNodeId.equals(shardRouting.currentNodeId()) == false) {
expectedReplicas.add(shardRouting);
}
if (shardRouting.relocating() && localNodeId.equals(shardRouting.relocatingNodeId()) == false) {
expectedReplicas.add(shardRouting.buildTargetRelocatingShard());
}
}
}
return expectedReplicas;
}
public static class Request extends ReplicationRequest<Request> {
public AtomicBoolean processedOnPrimary = new AtomicBoolean();
public Set<ShardRouting> processedOnReplicas = ConcurrentCollections.newConcurrentSet();
public Request() {
}
Request(ShardId shardId) {
this();
this.shardId = shardId;
this.index = shardId.getIndexName();
// keep things simple
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
}
static class Response extends ReplicationResponse {
}
static class TestPrimary implements ReplicationOperation.Primary<Request, Request, Response> {
final ShardRouting routing;
final long term;
TestPrimary(ShardRouting routing, long term) {
this.routing = routing;
this.term = term;
}
@Override
public ShardRouting routingEntry() {
return routing;
}
@Override
public void failShard(String message, Throwable throwable) {
throw new AssertionError("should shouldn't be failed with [" + message + "]", throwable);
}
@Override
public Tuple<Response, Request> perform(Request request) throws Exception {
if (request.processedOnPrimary.compareAndSet(false, true) == false) {
fail("processed [" + request + "] twice");
}
request.primaryTerm(term);
return new Tuple<>(new Response(), request);
}
}
static class TestReplicaProxy implements ReplicationOperation.Replicas<Request> {
final Map<ShardRouting, Throwable> opFailures;
final Set<ShardRouting> failedReplicas = ConcurrentCollections.newConcurrentSet();
TestReplicaProxy() {
this(Collections.emptyMap());
}
TestReplicaProxy(Map<ShardRouting, Throwable> opFailures) {
this.opFailures = opFailures;
}
@Override
public void performOn(ShardRouting replica, Request request, ActionListener<TransportResponse.Empty> listener) {
assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica));
if (opFailures.containsKey(replica)) {
listener.onFailure(opFailures.get(replica));
} else {
listener.onResponse(TransportResponse.Empty.INSTANCE);
}
}
@Override
public void failShard(ShardRouting replica, ShardRouting primary, String message, Throwable throwable, Runnable onSuccess,
Consumer<Throwable> onPrimaryDemoted, Consumer<Throwable> onIgnoredFailure) {
if (failedReplicas.add(replica) == false) {
fail("replica [" + replica + "] was failed twice");
}
if (opFailures.containsKey(replica)) {
if (randomBoolean()) {
onSuccess.run();
} else {
onIgnoredFailure.accept(new ElasticsearchException("simulated"));
}
} else {
fail("replica [" + replica + "] was failed");
}
}
}
class TestReplicationOperation extends ReplicationOperation<Request, Request, Response> {
public TestReplicationOperation(Request request, Primary<Request, Request, Response> primary, ActionListener<Response> listener,
Replicas<Request> replicas, Supplier<ClusterState> clusterStateSupplier) {
this(request, primary, listener, true, false, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test");
}
public TestReplicationOperation(Request request, Primary<Request, Request, Response> primary, ActionListener<Response> listener,
boolean executeOnReplicas, boolean checkWriteConsistency, Replicas<Request> replicas,
Supplier<ClusterState> clusterStateSupplier, ESLogger logger, String opType) {
super(request, primary, listener, executeOnReplicas, checkWriteConsistency, replicas, clusterStateSupplier, logger, opType);
}
}
<T> void assertListenerThrows(String msg, PlainActionFuture<T> listener, Class<?> klass) throws InterruptedException {
try {
listener.get();
fail(msg);
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(klass));
}
}
}

View File

@ -132,32 +132,6 @@ public class TimestampMappingTests extends ESSingleNodeTestCase {
assertThat(disabledMapper.timestampFieldMapper().enabled(), is(false));
}
// Issue 4718: was throwing a TimestampParsingException: failed to parse timestamp [null]
public void testTimestampDefaultValue() throws Exception {
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("_timestamp")
.field("enabled", "yes")
.endObject()
.endObject().endObject();
XContentBuilder doc = XContentFactory.jsonBuilder()
.startObject()
.field("foo", "bar")
.endObject();
MetaData metaData = MetaData.builder().build();
DocumentMapper docMapper = createIndex("test").mapperService().documentMapperParser().parse("type", new CompressedXContent(mapping.string()));
MappingMetaData mappingMetaData = new MappingMetaData(docMapper);
IndexRequest request = new IndexRequest("test", "type", "1").source(doc);
request.process(metaData, mappingMetaData, true, "test");
assertThat(request.timestamp(), notNullValue());
// We should have less than one minute (probably some ms)
long delay = System.currentTimeMillis() - Long.parseLong(request.timestamp());
assertThat(delay, lessThanOrEqualTo(60000L));
}
// Issue 4718: was throwing a TimestampParsingException: failed to parse timestamp [null]
public void testTimestampMissingDefaultToEpochValue() throws Exception {
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
@ -178,7 +152,7 @@ public class TimestampMappingTests extends ESSingleNodeTestCase {
MappingMetaData mappingMetaData = new MappingMetaData(docMapper);
IndexRequest request = new IndexRequest("test", "type", "1").source(doc);
request.process(metaData, mappingMetaData, true, "test");
request.process(mappingMetaData, true, "test");
assertThat(request.timestamp(), notNullValue());
assertThat(request.timestamp(), is(MappingMetaData.Timestamp.parseStringTimestamp("1970-01-01", Joda.forPattern("YYYY-MM-dd"))));
}
@ -203,7 +177,7 @@ public class TimestampMappingTests extends ESSingleNodeTestCase {
MappingMetaData mappingMetaData = new MappingMetaData(docMapper);
IndexRequest request = new IndexRequest("test", "type", "1").source(doc);
request.process(metaData, mappingMetaData, true, "test");
request.process(mappingMetaData, true, "test");
assertThat(request.timestamp(), notNullValue());
// We should have less than one minute (probably some ms)
@ -281,7 +255,7 @@ public class TimestampMappingTests extends ESSingleNodeTestCase {
MappingMetaData mappingMetaData = new MappingMetaData(docMapper);
IndexRequest request = new IndexRequest("test", "type", "1").source(doc);
request.process(metaData, mappingMetaData, true, "test");
request.process(mappingMetaData, true, "test");
assertThat(request.timestamp(), notNullValue());
@ -407,7 +381,7 @@ public class TimestampMappingTests extends ESSingleNodeTestCase {
XContentBuilder doc = XContentFactory.jsonBuilder().startObject().endObject();
IndexRequest request = new IndexRequest("test", "type", "1").source(doc).timestamp("2015060210");
MappingMetaData mappingMetaData = new MappingMetaData(docMapper);
request.process(metaData, mappingMetaData, true, "test");
request.process(mappingMetaData, true, "test");
assertThat(request.timestamp(), is("1433239200000"));
}
@ -419,16 +393,15 @@ public class TimestampMappingTests extends ESSingleNodeTestCase {
BytesReference source = XContentFactory.jsonBuilder().startObject().field("field", "value").endObject().bytes();
// test with 2.x
DocumentMapper currentMapper = createIndex("new-index").mapperService().documentMapperParser().parse("type", new CompressedXContent(mapping));
MetaData newMetaData = client().admin().cluster().prepareState().get().getState().getMetaData();
// this works with 2.x
IndexRequest request = new IndexRequest("new-index", "type", "1").source(source).timestamp("1970-01-01");
request.process(newMetaData, new MappingMetaData(currentMapper), true, "new-index");
request.process(new MappingMetaData(currentMapper), true, "new-index");
// this fails with 2.x
request = new IndexRequest("new-index", "type", "1").source(source).timestamp("1234567890");
try {
request.process(newMetaData, new MappingMetaData(currentMapper), true, "new-index");
request.process(new MappingMetaData(currentMapper), true, "new-index");
} catch (Exception e) {
assertThat(e.getCause(), instanceOf(IllegalArgumentException.class));
assertThat(e.getMessage(), containsString("failed to parse timestamp [1234567890]"));

View File

@ -46,7 +46,6 @@ import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.AllocationId;
@ -1071,7 +1070,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
// we can't issue this request through a client because of the inconsistencies we created with the cluster state
// doing it directly instead
IndexRequest request = client().prepareIndex("test", "test", "0").setSource("{}").request();
request.process(MetaData.builder().put(test.getMetaData(), false).build(), null, false, "test");
request.process(null, false, "test");
TransportIndexAction.executeIndexRequestOnPrimary(request, newShard, null);
newShard.refresh("test");
assertHitCount(client().prepareSearch().get(), 1);