From 7cd128b372b0011814df76247bafbbf55709ae3f Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 6 Feb 2016 11:15:24 +0100 Subject: [PATCH] Extract non-transport primary logic from TransportReplicationAction #16492 Extracts all the replication logic that is done on the Primary to a separated class called ReplicationOperation. The goal here is to make unit testing of this logic easier and in the future allow setting up tests that work directly on IndexShards without the need for networking. Closes #16492 --- .../resources/checkstyle_suppressions.xml | 2 - .../elasticsearch/ElasticsearchException.java | 5 +- .../flush/TransportShardFlushAction.java | 3 +- .../refresh/TransportShardRefreshAction.java | 3 +- .../action/bulk/TransportBulkAction.java | 3 +- .../action/bulk/TransportShardBulkAction.java | 30 +- .../action/delete/TransportDeleteAction.java | 33 +- .../action/index/IndexRequest.java | 10 +- .../action/index/TransportIndexAction.java | 30 +- .../replication/ReplicationOperation.java | 348 +++++++ .../TransportReplicationAction.java | 795 ++++---------- .../cluster/metadata/IndexMetaData.java | 5 +- .../TransportChannelResponseHandler.java | 42 +- .../ExceptionSerializationTests.java | 3 +- .../ClusterStateCreationUtils.java | 2 +- .../ReplicationOperationTests.java | 461 ++++++++ .../TransportReplicationActionTests.java | 980 +++++------------- .../timestamp/TimestampMappingTests.java | 39 +- .../index/shard/IndexShardTests.java | 3 +- 19 files changed, 1368 insertions(+), 1429 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java create mode 100644 core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 3b4655796da..b25e2a69e66 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -233,7 +233,6 @@ - @@ -849,7 +848,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index 7fd81f5ddfe..e6ed0e236f1 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -19,6 +19,7 @@ package org.elasticsearch; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -696,8 +697,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte org.elasticsearch.index.translog.TranslogException::new, 115), PROCESS_CLUSTER_EVENT_TIMEOUT_EXCEPTION(org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException.class, org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException::new, 116), - RETRY_ON_PRIMARY_EXCEPTION(org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnPrimaryException.class, - org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnPrimaryException::new, 117), + RETRY_ON_PRIMARY_EXCEPTION(ReplicationOperation.RetryOnPrimaryException.class, + ReplicationOperation.RetryOnPrimaryException::new, 117), ELASTICSEARCH_TIMEOUT_EXCEPTION(org.elasticsearch.ElasticsearchTimeoutException.class, org.elasticsearch.ElasticsearchTimeoutException::new, 118), QUERY_PHASE_EXECUTION_EXCEPTION(org.elasticsearch.search.query.QueryPhaseExecutionException.class, diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 3c22209813f..7e750b97677 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -56,7 +55,7 @@ public class TransportShardFlushAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) { + protected Tuple shardOperationOnPrimary(ShardFlushRequest shardRequest) { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); indexShard.flush(shardRequest.getRequest()); logger.trace("{} flush request executed on primary", indexShard.shardId()); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index e3155614337..0670c1f3cc6 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -58,7 +57,7 @@ public class TransportShardRefreshAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, BasicReplicationRequest shardRequest) { + protected Tuple shardOperationOnPrimary(BasicReplicationRequest shardRequest) { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); indexShard.refresh("api"); logger.trace("{} refresh request executed on primary", indexShard.shardId()); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 013b170c00f..667e691f6c8 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -261,7 +261,8 @@ public class TransportBulkAction extends HandledTransportAction shardOperationOnPrimary(MetaData metaData, BulkShardRequest request) { + protected Tuple shardOperationOnPrimary(BulkShardRequest request) { ShardId shardId = request.shardId(); final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard indexShard = indexService.getShard(shardId.getId()); + final IndexMetaData metaData = indexService.getIndexSettings().getIndexMetaData(); long[] preVersions = new long[request.items().length]; VersionType[] preVersionTypes = new VersionType[request.items().length]; @@ -127,7 +130,7 @@ public class TransportShardBulkAction extends TransportReplicationAction(new BulkShardResponse(request.shardId(), responses), request); } - private Translog.Location handleItem(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) { + private Translog.Location handleItem(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) { if (item.request() instanceof IndexRequest) { location = index(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item); } else if (item.request() instanceof DeleteRequest) { @@ -145,7 +148,7 @@ public class TransportShardBulkAction extends TransportReplicationAction update(MetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) { + private Tuple update(IndexMetaData metaData, BulkShardRequest request, IndexShard indexShard, long[] preVersions, VersionType[] preVersionTypes, Translog.Location location, int requestIndex, BulkItemRequest item) { UpdateRequest updateRequest = (UpdateRequest) item.request(); preVersions[requestIndex] = updateRequest.version(); preVersionTypes[requestIndex] = updateRequest.versionType(); @@ -326,19 +329,12 @@ public class TransportShardBulkAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, DeleteRequest request) { + protected Tuple shardOperationOnPrimary(DeleteRequest request) { IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); final WriteResult result = executeDeleteRequestOnPrimary(request, indexShard); processAfterWrite(request.refresh(), indexShard, result.location); diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index f8b0ce6c13f..31accebc3b2 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -582,10 +582,7 @@ public class IndexRequest extends ReplicationRequest implements Do } - public void process(MetaData metaData, @Nullable MappingMetaData mappingMd, boolean allowIdGeneration, String concreteIndex) { - // resolve the routing if needed - routing(metaData.resolveIndexRouting(parent, routing, index)); - + public void process(@Nullable MappingMetaData mappingMd, boolean allowIdGeneration, String concreteIndex) { // resolve timestamp if provided externally if (timestamp != null) { timestamp = MappingMetaData.Timestamp.parseStringTimestamp(timestamp, @@ -638,6 +635,11 @@ public class IndexRequest extends ReplicationRequest implements Do } } + /* resolve the routing if needed */ + public void resolveRouting(MetaData metaData) { + routing(metaData.resolveIndexRouting(parent, routing, index)); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 9be8e4cef89..3915e23c2ed 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -21,12 +21,12 @@ package org.elasticsearch.action.index; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; @@ -122,13 +122,12 @@ public class TransportIndexAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Exception { - - // validate, if routing is required, that we got routing - IndexMetaData indexMetaData = metaData.getIndexSafe(request.shardId().getIndex()); - MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type()); - if (mappingMd != null && mappingMd.routing().required()) { - if (request.routing() == null) { - throw new RoutingMissingException(request.shardId().getIndex().getName(), request.type(), request.id()); - } - } + protected Tuple shardOperationOnPrimary(IndexRequest request) throws Exception { IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard indexShard = indexService.getShard(request.shardId().id()); @@ -200,7 +190,7 @@ public class TransportIndexAction extends TransportReplicationAction executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Exception { Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard); @@ -211,7 +201,7 @@ public class TransportIndexAction extends TransportReplicationAction, ReplicaRequest extends ReplicationRequest, + Response extends ReplicationResponse> { + final private ESLogger logger; + final private Request request; + final private Supplier clusterStateSupplier; + final private String opType; + final private AtomicInteger totalShards = new AtomicInteger(); + final private AtomicInteger pendingShards = new AtomicInteger(); + final private AtomicInteger successfulShards = new AtomicInteger(); + final private boolean executeOnReplicas; + final private boolean checkWriteConsistency; + final private Primary primary; + final private Replicas replicasProxy; + final private AtomicBoolean finished = new AtomicBoolean(); + final protected ActionListener finalResponseListener; + + private volatile Response finalResponse = null; + + private final List shardReplicaFailures = Collections.synchronizedList(new ArrayList<>()); + + ReplicationOperation(Request request, Primary primary, + ActionListener listener, + boolean executeOnReplicas, boolean checkWriteConsistency, + Replicas replicas, + Supplier clusterStateSupplier, ESLogger logger, String opType) { + this.checkWriteConsistency = checkWriteConsistency; + this.executeOnReplicas = executeOnReplicas; + this.replicasProxy = replicas; + this.primary = primary; + this.finalResponseListener = listener; + this.logger = logger; + this.request = request; + this.clusterStateSupplier = clusterStateSupplier; + this.opType = opType; + } + + void execute() throws Exception { + final String writeConsistencyFailure = checkWriteConsistency ? checkWriteConsistency() : null; + final ShardId shardId = primary.routingEntry().shardId(); + if (writeConsistencyFailure != null) { + finishAsFailed(new UnavailableShardsException(shardId, + "{} Timeout: [{}], request: [{}]", writeConsistencyFailure, request.timeout(), request)); + return; + } + + totalShards.incrementAndGet(); + pendingShards.incrementAndGet(); // increase by 1 until we finish all primary coordination + Tuple primaryResponse = primary.perform(request); + successfulShards.incrementAndGet(); // mark primary as successful + finalResponse = primaryResponse.v1(); + ReplicaRequest replicaRequest = primaryResponse.v2(); + assert replicaRequest.primaryTerm() > 0 : "replicaRequest doesn't have a primary term"; + if (logger.isTraceEnabled()) { + logger.trace("[{}] op [{}] completed on primary for request [{}]", shardId, opType, request); + } + // we have to get a new state after successfully indexing into the primary in order to honour recovery semantics. + // we have to make sure that every operation indexed into the primary after recovery start will also be replicated + // to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then. + // If the index gets deleted after primary operation, we skip replication + List shards = getShards(shardId, clusterStateSupplier.get()); + final String localNodeId = primary.routingEntry().currentNodeId(); + for (final ShardRouting shard : shards) { + if (executeOnReplicas == false || shard.unassigned()) { + if (shard.primary() == false) { + totalShards.incrementAndGet(); + } + continue; + } + + if (shard.currentNodeId().equals(localNodeId) == false) { + performOnReplica(shard, replicaRequest); + } + + if (shard.relocating() && shard.relocatingNodeId().equals(localNodeId) == false) { + performOnReplica(shard.buildTargetRelocatingShard(), replicaRequest); + } + } + + // decrement pending and finish (if there are no replicas, or those are done) + decPendingAndFinishIfNeeded(); // incremented in the beginning of this method + } + + private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest) { + if (logger.isTraceEnabled()) { + logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest); + } + + totalShards.incrementAndGet(); + pendingShards.incrementAndGet(); + replicasProxy.performOn(shard, replicaRequest, new ActionListener() { + @Override + public void onResponse(TransportResponse.Empty empty) { + successfulShards.incrementAndGet(); + decPendingAndFinishIfNeeded(); + } + + @Override + public void onFailure(Throwable replicaException) { + logger.trace("[{}] failure while performing [{}] on replica {}, request [{}]", replicaException, shard.shardId(), opType, + shard, replicaRequest); + if (ignoreReplicaException(replicaException)) { + decPendingAndFinishIfNeeded(); + } else { + RestStatus restStatus = ExceptionsHelper.status(replicaException); + shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure( + shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false)); + String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard); + logger.warn("[{}] {}", replicaException, shard.shardId(), message); + replicasProxy.failShard(shard, primary.routingEntry(), message, replicaException, + ReplicationOperation.this::decPendingAndFinishIfNeeded, + ReplicationOperation.this::onPrimaryDemoted, + throwable -> decPendingAndFinishIfNeeded() + ); + } + } + }); + } + + private void onPrimaryDemoted(Throwable demotionFailure) { + String primaryFail = String.format(Locale.ROOT, + "primary shard [%s] was demoted while failing replica shard", + primary.routingEntry()); + // we are no longer the primary, fail ourselves and start over + primary.failShard(primaryFail, demotionFailure); + finishAsFailed(new RetryOnPrimaryException(primary.routingEntry().shardId(), primaryFail, demotionFailure)); + } + + /** + * checks whether we can perform a write based on the write consistency setting + * returns **null* if OK to proceed, or a string describing the reason to stop + */ + String checkWriteConsistency() { + assert request.consistencyLevel() != WriteConsistencyLevel.DEFAULT : "consistency level should be set"; + final ShardId shardId = primary.routingEntry().shardId(); + final ClusterState state = clusterStateSupplier.get(); + final WriteConsistencyLevel consistencyLevel = request.consistencyLevel(); + final int sizeActive; + final int requiredNumber; + IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndexName()); + if (indexRoutingTable != null) { + IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId()); + if (shardRoutingTable != null) { + sizeActive = shardRoutingTable.activeShards().size(); + if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) { + // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, + // quorum is 1 (which is what it is initialized to) + requiredNumber = (shardRoutingTable.getSize() / 2) + 1; + } else if (consistencyLevel == WriteConsistencyLevel.ALL) { + requiredNumber = shardRoutingTable.getSize(); + } else { + requiredNumber = 1; + } + } else { + sizeActive = 0; + requiredNumber = 1; + } + } else { + sizeActive = 0; + requiredNumber = 1; + } + + if (sizeActive < requiredNumber) { + logger.trace("[{}] not enough active copies to meet write consistency of [{}] (have {}, needed {}), scheduling a retry." + + " op [{}], request [{}]", shardId, consistencyLevel, sizeActive, requiredNumber, opType, request); + return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + + requiredNumber + ")."; + } else { + return null; + } + } + + protected List getShards(ShardId shardId, ClusterState state) { + // can be null if the index is deleted / closed on us.. + final IndexShardRoutingTable shardRoutingTable = state.getRoutingTable().shardRoutingTableOrNull(shardId); + List shards = shardRoutingTable == null ? Collections.emptyList() : shardRoutingTable.shards(); + return shards; + } + + private void decPendingAndFinishIfNeeded() { + assert pendingShards.get() > 0; + if (pendingShards.decrementAndGet() == 0) { + finish(); + } + } + + private void finish() { + if (finished.compareAndSet(false, true)) { + final ReplicationResponse.ShardInfo.Failure[] failuresArray; + if (shardReplicaFailures.isEmpty()) { + failuresArray = ReplicationResponse.EMPTY; + } else { + failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()]; + shardReplicaFailures.toArray(failuresArray); + } + finalResponse.setShardInfo(new ReplicationResponse.ShardInfo( + totalShards.get(), + successfulShards.get(), + failuresArray + ) + ); + finalResponseListener.onResponse(finalResponse); + } + } + + private void finishAsFailed(Throwable throwable) { + if (finished.compareAndSet(false, true)) { + finalResponseListener.onFailure(throwable); + } + } + + + /** + * Should an exception be ignored when the operation is performed on the replica. + */ + public static boolean ignoreReplicaException(Throwable e) { + if (TransportActions.isShardNotAvailableException(e)) { + return true; + } + // on version conflict or document missing, it means + // that a new change has crept into the replica, and it's fine + if (isConflictException(e)) { + return true; + } + return false; + } + + public static boolean isConflictException(Throwable e) { + Throwable cause = ExceptionsHelper.unwrapCause(e); + // on version conflict or document missing, it means + // that a new change has crept into the replica, and it's fine + if (cause instanceof VersionConflictEngineException) { + return true; + } + return false; + } + + + interface Primary, ReplicaRequest extends ReplicationRequest, + Response extends ReplicationResponse> { + + /** routing entry for this primary */ + ShardRouting routingEntry(); + + /** fail the primary, typically due to the fact that the operation has learned the primary has been demoted by the master */ + void failShard(String message, Throwable throwable); + + /** + * Performs the given request on this primary + * + * @return A tuple containing not null values, as first value the result of the primary operation and as second value + * the request to be executed on the replica shards. + */ + Tuple perform(Request request) throws Exception; + + } + + interface Replicas> { + + /** + * performs the the given request on the specified replica + * + * @param replica {@link ShardRouting} of the shard this request should be executed on + * @param replicaRequest operation to peform + * @param listener a callback to call once the operation has been complicated, either successfully or with an error. + */ + void performOn(ShardRouting replica, ReplicaRequest replicaRequest, ActionListener listener); + + /** + * Fail the specified shard, removing it from the current set of active shards + * @param replica shard to fail + * @param primary the primary shard that requested the failure + * @param message a (short) description of the reason + * @param throwable the original exception which caused the ReplicationOperation to request the shard to be failed + * @param onSuccess a callback to call when the shard has been successfully removed from the active set. + * @param onPrimaryDemoted a callback to call when the shard can not be failed because the current primary has been demoted + * by the master. + * @param onIgnoredFailure a callback to call when failing a shard has failed, but it that failure can be safely ignored and the + * replication operation can finish processing + * Note: this callback should be used in extreme situations, typically node shutdown. + */ + void failShard(ShardRouting replica, ShardRouting primary, String message, Throwable throwable, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure); + } + + public static class RetryOnPrimaryException extends ElasticsearchException { + public RetryOnPrimaryException(ShardId shardId, String msg) { + this(shardId, msg, null); + } + + public RetryOnPrimaryException(ShardId shardId, String msg, Throwable cause) { + super(msg, cause); + setShard(shardId); + } + + public RetryOnPrimaryException(StreamInput in) throws IOException { + super(in); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 8383872cdae..2cc647ddd5a 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -20,14 +20,15 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -37,35 +38,28 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeClosedException; -import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.EmptyTransportResponseHandler; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannelResponseHandler; import org.elasticsearch.transport.TransportException; @@ -75,14 +69,7 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; @@ -94,19 +81,20 @@ import java.util.function.Supplier; * primary node to validate request before primary operation followed by sampling state again for resolving * nodes with replica copies to perform replication. */ -public abstract class TransportReplicationAction, ReplicaRequest extends ReplicationRequest, Response extends ReplicationResponse> extends TransportAction { +public abstract class TransportReplicationAction, + ReplicaRequest extends ReplicationRequest, + Response extends ReplicationResponse> extends TransportAction { - protected final TransportService transportService; - protected final ClusterService clusterService; - protected final IndicesService indicesService; - protected final ShardStateAction shardStateAction; - protected final WriteConsistencyLevel defaultWriteConsistencyLevel; - protected final TransportRequestOptions transportOptions; + final protected TransportService transportService; + final protected ClusterService clusterService; + final protected IndicesService indicesService; + final private ShardStateAction shardStateAction; + final private WriteConsistencyLevel defaultWriteConsistencyLevel; + final private TransportRequestOptions transportOptions; - final String transportReplicaAction; - final String transportPrimaryAction; - final String executor; - final boolean checkWriteConsistency; + final private String transportReplicaAction; + final private String transportPrimaryAction; + final private ReplicasProxy replicasProxy; protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, IndicesService indicesService, @@ -122,16 +110,17 @@ public abstract class TransportReplicationAction shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception; + protected abstract Tuple shardOperationOnPrimary(Request shardRequest) throws Exception; /** * Replica operation on nodes with replica copies @@ -200,33 +193,8 @@ public abstract class TransportReplicationAction { @@ -287,7 +255,86 @@ public abstract class TransportReplicationAction(logger, channel, "rerouting indexing to target primary " + primary, + TransportReplicationAction.this::newResponseInstance) { + + @Override + public void handleResponse(Response response) { + setPhase(replicationTask, "finished"); + super.handleResponse(response); + } + + @Override + public void handleException(TransportException exp) { + setPhase(replicationTask, "finished"); + super.handleException(exp); + } + }); + } else { + setPhase(replicationTask, "primary"); + final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex()); + final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings()); + final ActionListener listener = createResponseListener(channel, replicationTask, primaryShardReference); + createReplicatedOperation(request, listener, primaryShardReference, executeOnReplicas).execute(); + success = true; + } + } finally { + if (success == false) { + primaryShardReference.close(); + } + } + } + + protected ReplicationOperation + createReplicatedOperation(Request request, ActionListener listener, + PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { + return new ReplicationOperation<>(request, primaryShardReference, listener, + executeOnReplicas, checkWriteConsistency(), replicasProxy, clusterService::state, logger, actionName + ); + } + + private ActionListener createResponseListener(final TransportChannel channel, final ReplicationTask replicationTask, + final PrimaryShardReference primaryShardReference) { + return new ActionListener() { + @Override + public void onResponse(Response response) { + finish(); + try { + channel.sendResponse(response); + } catch (IOException e) { + onFailure(e); + } + } + + private void finish() { + primaryShardReference.close(); + setPhase(replicationTask, "finished"); + } + + @Override + public void onFailure(Throwable e) { + setPhase(replicationTask, "finished"); + primaryShardReference.close(); + try { + channel.sendResponse(e); + } catch (IOException e1) { + logger.warn("failed to send response", e); + } + } + }; } } @@ -344,7 +391,8 @@ public abstract class TransportReplicationAction handler = TransportChannelResponseHandler.emptyResponseHandler(logger, channel, extraMessage); + TransportChannelResponseHandler handler = + new TransportChannelResponseHandler<>(logger, channel, extraMessage, () -> TransportResponse.Empty.INSTANCE); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler); } @@ -359,12 +407,13 @@ public abstract class TransportReplicationAction - * Note that as soon as we move to replication action, state responsibility is transferred to {@link ReplicationPhase}. - */ - class PrimaryPhase extends AbstractRunnable { - private final ReplicationTask task; - private final Request request; - private final ShardId shardId; - private final TransportChannel channel; - private final ClusterState state; - private final AtomicBoolean finished = new AtomicBoolean(); - private IndexShardReference indexShardReference; - - PrimaryPhase(ReplicationTask task, Request request, TransportChannel channel) { - this.state = clusterService.state(); - this.task = task; - this.request = request; - assert request.shardId() != null : "request shardId must be set prior to primary phase"; - this.shardId = request.shardId(); - this.channel = channel; - } - - @Override - public void onFailure(Throwable e) { - if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { - if (logger.isTraceEnabled()) { - logger.trace("failed to execute [{}] on [{}]", e, request, shardId); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("failed to execute [{}] on [{}]", e, request, shardId); - } - } - finishAsFailed(e); - } - - @Override - protected void doRun() throws Exception { - setPhase(task, "primary"); - // request shardID was set in ReroutePhase - final String writeConsistencyFailure = checkWriteConsistency(shardId); - if (writeConsistencyFailure != null) { - finishBecauseUnavailable(shardId, writeConsistencyFailure); - return; - } - // closed in finishAsFailed(e) in the case of error - indexShardReference = getIndexShardReferenceOnPrimary(shardId, request); - if (indexShardReference.isRelocated() == false) { - executeLocally(); - } else { - executeRemotely(); - } - } - - private void executeLocally() throws Exception { - // execute locally - Tuple primaryResponse = shardOperationOnPrimary(state.metaData(), request); - primaryResponse.v2().primaryTerm(indexShardReference.opPrimaryTerm()); - if (logger.isTraceEnabled()) { - logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version()); - } - ReplicationPhase replicationPhase = new ReplicationPhase(task, primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference); - finishAndMoveToReplication(replicationPhase); - } - - private void executeRemotely() { - // delegate primary phase to relocation target - // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary - // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase. - final ShardRouting primary = indexShardReference.routingEntry(); - indexShardReference.close(); - assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary; - DiscoveryNode relocatingNode = state.nodes().get(primary.relocatingNodeId()); - transportService.sendRequest(relocatingNode, transportPrimaryAction, request, transportOptions, - TransportChannelResponseHandler.responseHandler(logger, TransportReplicationAction.this::newResponseInstance, channel, - "rerouting indexing to target primary " + primary)); - } - - /** - * checks whether we can perform a write based on the write consistency setting - * returns **null* if OK to proceed, or a string describing the reason to stop - */ - String checkWriteConsistency(ShardId shardId) { - if (checkWriteConsistency == false) { - return null; - } - - final WriteConsistencyLevel consistencyLevel; - if (request.consistencyLevel() != WriteConsistencyLevel.DEFAULT) { - consistencyLevel = request.consistencyLevel(); - } else { - consistencyLevel = defaultWriteConsistencyLevel; - } - final int sizeActive; - final int requiredNumber; - IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(shardId.getIndexName()); - if (indexRoutingTable != null) { - IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId()); - if (shardRoutingTable != null) { - sizeActive = shardRoutingTable.activeShards().size(); - if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardRoutingTable.getSize() > 2) { - // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to) - requiredNumber = (shardRoutingTable.getSize() / 2) + 1; - } else if (consistencyLevel == WriteConsistencyLevel.ALL) { - requiredNumber = shardRoutingTable.getSize(); - } else { - requiredNumber = 1; - } - } else { - sizeActive = 0; - requiredNumber = 1; - } - } else { - sizeActive = 0; - requiredNumber = 1; - } - - if (sizeActive < requiredNumber) { - logger.trace("not enough active copies of shard [{}] to meet write consistency of [{}] (have {}, needed {}), scheduling a retry. action [{}], request [{}]", - shardId, consistencyLevel, sizeActive, requiredNumber, transportPrimaryAction, request); - return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " + requiredNumber + ")."; - } else { - return null; - } - } - - /** - * upon success, finish the first phase and transfer responsibility to the {@link ReplicationPhase} - */ - void finishAndMoveToReplication(ReplicationPhase replicationPhase) { - if (finished.compareAndSet(false, true)) { - replicationPhase.run(); - } else { - assert false : "finishAndMoveToReplication called but operation is already finished"; - } - } - - /** - * upon failure, send failure back to the {@link ReroutePhase} for retrying if appropriate - */ - void finishAsFailed(Throwable failure) { - if (finished.compareAndSet(false, true)) { - setPhase(task, "failed"); - Releasables.close(indexShardReference); - logger.trace("operation failed", failure); - try { - channel.sendResponse(failure); - } catch (IOException responseException) { - logger.warn("failed to send error message back to client for action [{}]", responseException, transportPrimaryAction); - } - } else { - assert false : "finishAsFailed called but operation is already finished"; - } - } - - void finishBecauseUnavailable(ShardId shardId, String message) { - finishAsFailed(new UnavailableShardsException(shardId, "{} Timeout: [{}], request: [{}]", message, request.timeout(), request)); - } - } - /** * returns a new reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally - * and replication of the operation to all replica shards is completed / failed (see {@link ReplicationPhase}). + * and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}). */ - protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) { + protected PrimaryShardReference getPrimaryShardReference(ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); // we may end up here if the cluster state used to route the primary is so stale that the underlying // index shard was replaced with a replica. For example - in a two node cluster, if the primary fails // the replica will take over and a replica will be assigned to the first node. if (indexShard.routingEntry().primary() == false) { - throw new RetryOnPrimaryException(indexShard.shardId(), "actual shard is not a primary " + indexShard.routingEntry()); + throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(), + "actual shard is not a primary " + indexShard.routingEntry()); } - return IndexShardReferenceImpl.createOnPrimary(indexShard); + return new PrimaryShardReference(indexShard, indexShard.acquirePrimaryOperationLock()); } /** - * returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as + * Acquire an operation on replicas. The lock is closed as soon as * replication is completed on the node. */ - protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) { + protected Releasable acquireReplicaOperationLock(ShardId shardId, long primaryTerm) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); - IndexShardReference ref = IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm); - return ref; + return indexShard.acquireReplicaOperationLock(primaryTerm); } /** - * Responsible for sending replica requests (see {@link AsyncReplicaAction}) to nodes with replica copy, including - * relocating copies - */ - final class ReplicationPhase extends AbstractRunnable { - - private final ReplicationTask task; - private final ReplicaRequest replicaRequest; - private final Response finalResponse; - private final TransportChannel channel; - private final ShardId shardId; - private final List shards; - private final DiscoveryNodes nodes; - private final boolean executeOnReplica; - private final AtomicBoolean finished = new AtomicBoolean(); - private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard - private final ConcurrentMap shardReplicaFailures = ConcurrentCollections.newConcurrentMap(); - private final AtomicInteger pending; - private final int totalShards; - private final IndexShardReference indexShardReference; - - public ReplicationPhase(ReplicationTask task, ReplicaRequest replicaRequest, Response finalResponse, ShardId shardId, - TransportChannel channel, IndexShardReference indexShardReference) { - this.task = task; - this.replicaRequest = replicaRequest; - this.channel = channel; - this.finalResponse = finalResponse; - this.indexShardReference = indexShardReference; - this.shardId = shardId; - - // we have to get a new state after successfully indexing into the primary in order to honour recovery semantics. - // we have to make sure that every operation indexed into the primary after recovery start will also be replicated - // to the recovery target. If we use an old cluster state, we may miss a relocation that has started since then. - // If the index gets deleted after primary operation, we skip replication - final ClusterState state = clusterService.state(); - final IndexShardRoutingTable shardRoutingTable = state.getRoutingTable().shardRoutingTableOrNull(shardId); - final IndexMetaData indexMetaData = state.getMetaData().index(shardId.getIndex()); - List shards = shards(shardRoutingTable); - boolean executeOnReplica = (indexMetaData == null) || shouldExecuteReplication(indexMetaData.getSettings()); - DiscoveryNodes nodes = state.getNodes(); - - if (shards.isEmpty()) { - logger.debug("replication phase for request [{}] on [{}] is skipped due to index deletion after primary operation", replicaRequest, shardId); - } - - // we calculate number of target nodes to send replication operations, including nodes with relocating shards - AtomicInteger numberOfPendingShardInstances = new AtomicInteger(); - this.totalShards = countTotalAndPending(shards, executeOnReplica, nodes, numberOfPendingShardInstances); - this.pending = numberOfPendingShardInstances; - this.shards = shards; - this.executeOnReplica = executeOnReplica; - this.nodes = nodes; - if (logger.isTraceEnabled()) { - logger.trace("replication phase started. pending [{}], action [{}], request [{}], cluster state version used [{}]", pending.get(), - transportReplicaAction, replicaRequest, state.version()); - } - } - - private int countTotalAndPending(List shards, boolean executeOnReplica, DiscoveryNodes nodes, AtomicInteger pending) { - assert pending.get() == 0; - int numberOfIgnoredShardInstances = performOnShards(shards, executeOnReplica, nodes, shard -> pending.incrementAndGet(), shard -> pending.incrementAndGet()); - // one for the local primary copy - return 1 + numberOfIgnoredShardInstances + pending.get(); - } - - private int performOnShards(List shards, boolean executeOnReplica, DiscoveryNodes nodes, Consumer onLocalShard, Consumer onRelocatingShard) { - int numberOfIgnoredShardInstances = 0; - for (ShardRouting shard : shards) { - if (shard.primary() == false && executeOnReplica == false) { - // If the replicas use shadow replicas, there is no reason to - // perform the action on the replica, so skip it and - // immediately return - - // this delays mapping updates on replicas because they have - // to wait until they get the new mapping through the cluster - // state, which is why we recommend pre-defined mappings for - // indices using shadow replicas - numberOfIgnoredShardInstances++; - continue; - } - if (shard.unassigned()) { - numberOfIgnoredShardInstances++; - continue; - } - // we index on a replica that is initializing as well since we might not have got the event - // yet that it was started. We will get an exception IllegalShardState exception if its not started - // and that's fine, we will ignore it - - // we never execute replication operation locally as primary operation has already completed locally - // hence, we ignore any local shard for replication - if (nodes.getLocalNodeId().equals(shard.currentNodeId()) == false) { - onLocalShard.accept(shard); - } - // send operation to relocating shard - // local shard can be a relocation target of a primary that is in relocated state - if (shard.relocating() && nodes.getLocalNodeId().equals(shard.relocatingNodeId()) == false) { - onRelocatingShard.accept(shard); - } - } - return numberOfIgnoredShardInstances; - } - - private List shards(IndexShardRoutingTable shardRoutingTable) { - return (shardRoutingTable != null) ? shardRoutingTable.shards() : Collections.emptyList(); - } - - /** - * total shard copies - */ - int totalShards() { - return totalShards; - } - - /** - * total successful operations so far - */ - int successful() { - return success.get(); - } - - /** - * number of pending operations - */ - int pending() { - return pending.get(); - } - - @Override - public void onFailure(Throwable t) { - logger.error("unexpected error while replicating for action [{}]. shard [{}]. ", t, actionName, shardId); - forceFinishAsFailed(t); - } - - /** - * start sending replica requests to target nodes - */ - @Override - protected void doRun() { - setPhase(task, "replicating"); - if (pending.get() == 0) { - doFinish(); - return; - } - performOnShards(shards, executeOnReplica, nodes, shard -> performOnReplica(shard), shard -> performOnReplica(shard.buildTargetRelocatingShard())); - } - - /** - * send replica operation to target node - */ - void performOnReplica(final ShardRouting shard) { - // if we don't have that node, it means that it might have failed and will be created again, in - // this case, we don't have to do the operation, and just let it failover - String nodeId = shard.currentNodeId(); - if (!nodes.nodeExists(nodeId)) { - logger.trace("failed to send action [{}] on replica [{}] for request [{}] due to unknown node [{}]", transportReplicaAction, shard.shardId(), replicaRequest, nodeId); - onReplicaFailure(nodeId, null); - return; - } - if (logger.isTraceEnabled()) { - logger.trace("send action [{}] on replica [{}] for request [{}] to [{}]", transportReplicaAction, shard.shardId(), replicaRequest, nodeId); - } - - final DiscoveryNode node = nodes.get(nodeId); - transportService.sendRequest(node, transportReplicaAction, replicaRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { - @Override - public void handleResponse(TransportResponse.Empty vResponse) { - onReplicaSuccess(); - } - - @Override - public void handleException(TransportException exp) { - logger.trace("[{}] transport failure during replica request [{}], action [{}]", exp, node, replicaRequest, transportReplicaAction); - if (ignoreReplicaException(exp)) { - onReplicaFailure(nodeId, exp); - } else { - String message = String.format(Locale.ROOT, "failed to perform %s on replica on node %s", transportReplicaAction, node); - logger.warn("[{}] {}", exp, shardId, message); - shardStateAction.shardFailed( - shard, - indexShardReference.routingEntry(), - message, - exp, - new ShardStateAction.Listener() { - @Override - public void onSuccess() { - onReplicaFailure(nodeId, exp); - } - - @Override - public void onFailure(Throwable shardFailedError) { - if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { - String message = "unknown"; - try { - ShardRouting primaryShard = indexShardReference.routingEntry(); - message = String.format(Locale.ROOT, "primary shard [%s] was demoted while failing replica shard [%s] for [%s]", primaryShard, shard, exp); - // we are no longer the primary, fail ourselves and start over - indexShardReference.failShard(message, shardFailedError); - } catch (Throwable t) { - shardFailedError.addSuppressed(t); - } - forceFinishAsFailed(new RetryOnPrimaryException(shardId, message, shardFailedError)); - } else { - // these can occur if the node is shutting down and are okay - // any other exception here is not expected and merits investigation - assert shardFailedError instanceof TransportException || - shardFailedError instanceof NodeClosedException : shardFailedError; - onReplicaFailure(nodeId, exp); - } - } - } - ); - } - } - } - ); - } - - void onReplicaFailure(String nodeId, @Nullable Throwable e) { - // Only version conflict should be ignored from being put into the _shards header? - if (e != null && ignoreReplicaException(e) == false) { - shardReplicaFailures.put(nodeId, e); - } - decPendingAndFinishIfNeeded(); - } - - void onReplicaSuccess() { - success.incrementAndGet(); - decPendingAndFinishIfNeeded(); - } - - private void decPendingAndFinishIfNeeded() { - if (pending.decrementAndGet() <= 0) { - doFinish(); - } - } - - private void forceFinishAsFailed(Throwable t) { - setPhase(task, "failed"); - if (finished.compareAndSet(false, true)) { - Releasables.close(indexShardReference); - try { - channel.sendResponse(t); - } catch (IOException responseException) { - logger.warn("failed to send error message back to client for action [{}]", responseException, transportReplicaAction); - logger.warn("actual Exception", t); - } - } - } - - private void doFinish() { - if (finished.compareAndSet(false, true)) { - setPhase(task, "finished"); - Releasables.close(indexShardReference); - final ReplicationResponse.ShardInfo.Failure[] failuresArray; - if (!shardReplicaFailures.isEmpty()) { - int slot = 0; - failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()]; - for (Map.Entry entry : shardReplicaFailures.entrySet()) { - RestStatus restStatus = ExceptionsHelper.status(entry.getValue()); - failuresArray[slot++] = new ReplicationResponse.ShardInfo.Failure(shardId, entry.getKey(), entry.getValue(), restStatus, false); - } - } else { - failuresArray = ReplicationResponse.EMPTY; - } - finalResponse.setShardInfo(new ReplicationResponse.ShardInfo( - totalShards, - success.get(), - failuresArray - ) - ); - if (logger.isTraceEnabled()) { - logger.trace("finished replicating action [{}], request [{}], shardInfo [{}]", actionName, replicaRequest, - finalResponse.getShardInfo()); - } - - try { - channel.sendResponse(finalResponse); - } catch (IOException responseException) { - logger.warn("failed to send error message back to client for action [{}]", responseException, transportReplicaAction); - } - if (logger.isTraceEnabled()) { - logger.trace("action [{}] completed on all replicas [{}] for request [{}]", transportReplicaAction, shardId, replicaRequest); - } - } - } - } - - /** - * Indicated whether this operation should be replicated to shadow replicas or not. If this method returns true the replication phase will be skipped. - * For example writes such as index and delete don't need to be replicated on shadow replicas but refresh and flush do. + * Indicated whether this operation should be replicated to shadow replicas or not. If this method returns true the replication phase + * will be skipped. For example writes such as index and delete don't need to be replicated on shadow replicas but refresh and flush do. */ protected boolean shouldExecuteReplication(Settings settings) { return IndexMetaData.isIndexUsingShadowReplicas(settings) == false; } - interface IndexShardReference extends Releasable { - boolean isRelocated(); - - void failShard(String reason, @Nullable Throwable e); - - ShardRouting routingEntry(); - - /** returns the primary term of the current operation */ - long opPrimaryTerm(); - } - - static final class IndexShardReferenceImpl implements IndexShardReference { + class PrimaryShardReference implements ReplicationOperation.Primary, Releasable { private final IndexShard indexShard; private final Releasable operationLock; - private IndexShardReferenceImpl(IndexShard indexShard, long primaryTerm) { + PrimaryShardReference(IndexShard indexShard, Releasable operationLock) { this.indexShard = indexShard; - if (primaryTerm < 0) { - operationLock = indexShard.acquirePrimaryOperationLock(); - } else { - operationLock = indexShard.acquireReplicaOperationLock(primaryTerm); - } - } - - static IndexShardReferenceImpl createOnPrimary(IndexShard indexShard) { - return new IndexShardReferenceImpl(indexShard, -1); - } - - static IndexShardReferenceImpl createOnReplica(IndexShard indexShard, long primaryTerm) { - return new IndexShardReferenceImpl(indexShard, primaryTerm); + this.operationLock = operationLock; } @Override @@ -1154,24 +736,71 @@ public abstract class TransportReplicationAction perform(Request request) throws Exception { + Tuple result = shardOperationOnPrimary(request); + result.v2().primaryTerm(indexShard.getPrimaryTerm()); + return result; } @Override public ShardRouting routingEntry() { return indexShard.routingEntry(); } + } + + final class ReplicasProxy implements ReplicationOperation.Replicas { @Override - public long opPrimaryTerm() { - return indexShard.getPrimaryTerm(); + public void performOn(ShardRouting replica, ReplicaRequest request, ActionListener listener) { + String nodeId = replica.currentNodeId(); + final DiscoveryNode node = clusterService.state().nodes().get(nodeId); + if (node == null) { + listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]")); + return; + } + transportService.sendRequest(node, transportReplicaAction, request, transportOptions, + new ActionListenerResponseHandler<>(listener, () -> TransportResponse.Empty.INSTANCE)); + } + + @Override + public void failShard(ShardRouting replica, ShardRouting primary, String message, Throwable throwable, + Runnable onSuccess, Consumer onFailure, Consumer onIgnoredFailure) { + shardStateAction.shardFailed( + replica, primary, message, throwable, + new ShardStateAction.Listener() { + @Override + public void onSuccess() { + onSuccess.run(); + } + + @Override + public void onFailure(Throwable shardFailedError) { + if (shardFailedError instanceof ShardStateAction.NoLongerPrimaryShardException) { + onFailure.accept(shardFailedError); + } else { + // these can occur if the node is shutting down and are okay + // any other exception here is not expected and merits investigation + assert shardFailedError instanceof TransportException || + shardFailedError instanceof NodeClosedException : shardFailedError; + onIgnoredFailure.accept(shardFailedError); + } + } + } + ); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index c2ae93f12dd..52721411f46 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -313,7 +313,10 @@ public class IndexMetaData implements Diffable, FromXContentBuild /** * The term of the current selected primary. This is a non-negative number incremented when - * a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary + * a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary. + * + * Note: since we increment the term every time a shard is assigned, the term for any operational shard (i.e., a shard + * that can be indexed into) is larger than 0. * See {@link AllocationService#updateMetaDataWithRoutingTable(MetaData, RoutingTable, RoutingTable)}. **/ public long primaryTerm(int shardId) { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java b/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java index 24be5ecc3ab..5f302964099 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java @@ -28,46 +28,24 @@ import java.util.function.Supplier; /** * Base class for delegating transport response to a transport channel */ -public abstract class TransportChannelResponseHandler implements TransportResponseHandler { - - /** - * Convenience method for delegating an empty response to the provided transport channel - */ - public static TransportChannelResponseHandler emptyResponseHandler(ESLogger logger, - TransportChannel channel, - String extraInfoOnError) { - return new TransportChannelResponseHandler(logger, channel, extraInfoOnError) { - @Override - public TransportResponse.Empty newInstance() { - return TransportResponse.Empty.INSTANCE; - } - }; - } - - /** - * Convenience method for delegating a response provided by supplier to the provided transport channel - */ - public static TransportChannelResponseHandler responseHandler(ESLogger logger, - Supplier responseSupplier, - TransportChannel channel, - String extraInfoOnError) { - return new TransportChannelResponseHandler(logger, channel, extraInfoOnError) { - @Override - public T newInstance() { - return responseSupplier.get(); - } - }; - } - +public class TransportChannelResponseHandler implements TransportResponseHandler { private final ESLogger logger; private final TransportChannel channel; private final String extraInfoOnError; + private final Supplier responseSupplier; - protected TransportChannelResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) { + public TransportChannelResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError, + Supplier responseSupplier) { this.logger = logger; this.channel = channel; this.extraInfoOnError = extraInfoOnError; + this.responseSupplier = responseSupplier; + } + + @Override + public T newInstance() { + return responseSupplier.get(); } @Override diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 316618f169c..c5b0651025a 100644 --- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.TimestampParsingException; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.client.AbstractClientHeadersTestCase; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -761,7 +762,7 @@ public class ExceptionSerializationTests extends ESTestCase { ids.put(114, org.elasticsearch.index.shard.IndexShardRecoveringException.class); ids.put(115, org.elasticsearch.index.translog.TranslogException.class); ids.put(116, org.elasticsearch.cluster.metadata.ProcessClusterEventTimeoutException.class); - ids.put(117, org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnPrimaryException.class); + ids.put(117, ReplicationOperation.RetryOnPrimaryException.class); ids.put(118, org.elasticsearch.ElasticsearchTimeoutException.class); ids.put(119, org.elasticsearch.search.query.QueryPhaseExecutionException.class); ids.put(120, org.elasticsearch.repositories.RepositoryVerificationException.class); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index f40ffcbc9e7..c498bf6ef5b 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -86,7 +86,7 @@ public class ClusterStateCreationUtils { } discoBuilder.localNodeId(newNode(0).getId()); discoBuilder.masterNodeId(newNode(1).getId()); // we need a non-local master to test shard failures - final int primaryTerm = randomInt(200); + final int primaryTerm = 1 + randomInt(200); IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder() .put(SETTING_VERSION_CREATED, Version.CURRENT) .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java new file mode 100644 index 00000000000..cc7558d1de8 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -0,0 +1,461 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.support.replication; + +import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ReplicationResponse; +import org.elasticsearch.action.UnavailableShardsException; +import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.shard.IndexShardNotStartedException; +import org.elasticsearch.index.shard.IndexShardState; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.TransportResponse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; +import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class ReplicationOperationTests extends ESTestCase { + + public void testReplication() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + + ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); + final long primaryTerm = state.getMetaData().index(index).primaryTerm(0); + final IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().shardRoutingTable(shardId); + ShardRouting primaryShard = indexShardRoutingTable.primaryShard(); + if (primaryShard.relocating() && randomBoolean()) { + // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated + state = ClusterState.builder(state) + .nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); + primaryShard = primaryShard.buildTargetRelocatingShard(); + } + + final Set expectedReplicas = getExpectedReplicas(shardId, state); + + final Map expectedFailures = new HashMap<>(); + final Set expectedFailedShards = new HashSet<>(); + for (ShardRouting replica : expectedReplicas) { + if (randomBoolean()) { + Throwable t; + boolean criticalFailure = randomBoolean(); + if (criticalFailure) { + t = new CorruptIndexException("simulated", (String) null); + } else { + t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING); + } + logger.debug("--> simulating failure on {} with [{}]", replica, t.getClass().getSimpleName()); + expectedFailures.put(replica, t); + if (criticalFailure) { + expectedFailedShards.add(replica); + } + } + } + + Request request = new Request(shardId); + PlainActionFuture listener = new PlainActionFuture<>(); + final ClusterState finalState = state; + final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures); + final TestReplicationOperation op = new TestReplicationOperation(request, + new TestPrimary(primaryShard, primaryTerm), listener, replicasProxy, () -> finalState); + op.execute(); + + assertThat(request.primaryTerm(), equalTo(primaryTerm)); + assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); + assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); + assertThat(replicasProxy.failedReplicas, equalTo(expectedFailedShards)); + assertTrue("listener is not marked as done", listener.isDone()); + Response.ShardInfo shardInfo = listener.actionGet().getShardInfo(); + assertThat(shardInfo.getFailed(), equalTo(expectedFailedShards.size())); + assertThat(shardInfo.getFailures(), arrayWithSize(expectedFailedShards.size())); + assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - expectedFailures.size())); + final List unassignedShards = + indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED); + final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size(); + assertThat(shardInfo.getTotal(), equalTo(totalShards)); + } + + + public void testReplicationWithShadowIndex() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + + final ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); + final long primaryTerm = state.getMetaData().index(index).primaryTerm(0); + final IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().shardRoutingTable(shardId); + final ShardRouting primaryShard = indexShardRoutingTable.primaryShard(); + + Request request = new Request(shardId); + PlainActionFuture listener = new PlainActionFuture<>(); + final TestReplicationOperation op = new TestReplicationOperation(request, + new TestPrimary(primaryShard, primaryTerm), listener, false, false, + new TestReplicaProxy(), () -> state, logger, "test"); + op.execute(); + assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); + assertThat(request.processedOnReplicas, equalTo(Collections.emptySet())); + assertTrue("listener is not marked as done", listener.isDone()); + Response.ShardInfo shardInfo = listener.actionGet().getShardInfo(); + assertThat(shardInfo.getFailed(), equalTo(0)); + assertThat(shardInfo.getFailures(), arrayWithSize(0)); + assertThat(shardInfo.getSuccessful(), equalTo(1)); + assertThat(shardInfo.getTotal(), equalTo(indexShardRoutingTable.getSize())); + } + + + public void testDemotedPrimary() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + + ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(2), randomInt(2)); + final long primaryTerm = state.getMetaData().index(index).primaryTerm(0); + ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + if (primaryShard.relocating() && randomBoolean()) { + // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated + state = ClusterState.builder(state) + .nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); + primaryShard = primaryShard.buildTargetRelocatingShard(); + } + + final Set expectedReplicas = getExpectedReplicas(shardId, state); + + final Map expectedFailures = new HashMap<>(); + final ShardRouting failedReplica = randomFrom(new ArrayList<>(expectedReplicas)); + expectedFailures.put(failedReplica, new CorruptIndexException("simulated", (String) null)); + + Request request = new Request(shardId); + PlainActionFuture listener = new PlainActionFuture<>(); + final ClusterState finalState = state; + final TestReplicaProxy replicasProxy = new TestReplicaProxy(expectedFailures) { + @Override + public void failShard(ShardRouting replica, ShardRouting primary, String message, Throwable throwable, + Runnable onSuccess, Consumer onPrimaryDemoted, + Consumer onIgnoredFailure) { + assertThat(replica, equalTo(failedReplica)); + onPrimaryDemoted.accept(new ElasticsearchException("the king is dead")); + } + }; + AtomicBoolean primaryFailed = new AtomicBoolean(); + final TestPrimary primary = new TestPrimary(primaryShard, primaryTerm) { + @Override + public void failShard(String message, Throwable throwable) { + assertTrue(primaryFailed.compareAndSet(false, true)); + } + }; + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy, + () -> finalState); + op.execute(); + + assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); + assertTrue("listener is not marked as done", listener.isDone()); + assertTrue(primaryFailed.get()); + assertListenerThrows("should throw exception to trigger retry", listener, + ReplicationOperation.RetryOnPrimaryException.class); + } + + public void testAddedReplicaAfterPrimaryOperation() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + final ClusterState initialState = stateWithActivePrimary(index, true, 0); + final ClusterState stateWithAddedReplicas; + if (randomBoolean()) { + stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED, + randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED); + } else { + stateWithAddedReplicas = state(index, true, ShardRoutingState.RELOCATING); + } + testClusterStateChangeAfterPrimaryOperation(shardId, initialState, stateWithAddedReplicas); + } + + public void testIndexDeletedAfterPrimaryOperation() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + final ClusterState initialState = state(index, true, ShardRoutingState.STARTED, ShardRoutingState.STARTED); + final ClusterState stateWithDeletedIndex = state(index + "_new", true, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); + testClusterStateChangeAfterPrimaryOperation(shardId, initialState, stateWithDeletedIndex); + } + + + private void testClusterStateChangeAfterPrimaryOperation(final ShardId shardId, + final ClusterState initialState, + final ClusterState changedState) throws Exception { + AtomicReference state = new AtomicReference<>(initialState); + logger.debug("--> using initial state:\n{}", state.get().prettyPrint()); + final long primaryTerm = initialState.getMetaData().index(shardId.getIndexName()).primaryTerm(shardId.id()); + final ShardRouting primaryShard = state.get().routingTable().shardRoutingTable(shardId).primaryShard(); + final TestPrimary primary = new TestPrimary(primaryShard, primaryTerm) { + @Override + public Tuple perform(Request request) throws Exception { + final Tuple tuple = super.perform(request); + state.set(changedState); + logger.debug("--> state after primary operation:\n{}", state.get().prettyPrint()); + return tuple; + } + }; + + Request request = new Request(shardId); + PlainActionFuture listener = new PlainActionFuture<>(); + final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, + new TestReplicaProxy(), state::get); + op.execute(); + + assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); + Set expectedReplicas = getExpectedReplicas(shardId, state.get()); + assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); + } + + public void testWriteConsistency() throws Exception { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + final int assignedReplicas = randomInt(2); + final int unassignedReplicas = randomInt(2); + final int totalShards = 1 + assignedReplicas + unassignedReplicas; + final boolean passesWriteConsistency; + Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values())); + switch (request.consistencyLevel()) { + case ONE: + passesWriteConsistency = true; + break; + case DEFAULT: + case QUORUM: + if (totalShards <= 2) { + passesWriteConsistency = true; // primary is enough + } else { + passesWriteConsistency = assignedReplicas + 1 >= (totalShards / 2) + 1; + } + // we have to reset default (as the transport replication action will do) + request.consistencyLevel(WriteConsistencyLevel.QUORUM); + break; + case ALL: + passesWriteConsistency = unassignedReplicas == 0; + break; + default: + throw new RuntimeException("unknown consistency level [" + request.consistencyLevel() + "]"); + } + ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; + for (int i = 0; i < assignedReplicas; i++) { + replicaStates[i] = randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); + } + for (int i = assignedReplicas; i < replicaStates.length; i++) { + replicaStates[i] = ShardRoutingState.UNASSIGNED; + } + + final ClusterState state = state(index, true, ShardRoutingState.STARTED, replicaStates); + logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]." + + " expecting op to [{}]. using state: \n{}", + request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, + passesWriteConsistency ? "succeed" : "retry", + state.prettyPrint()); + final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id()); + final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id()); + PlainActionFuture listener = new PlainActionFuture<>(); + final ShardRouting primaryShard = shardRoutingTable.primaryShard(); + final TestReplicationOperation op = new TestReplicationOperation(request, + new TestPrimary(primaryShard, primaryTerm), + listener, randomBoolean(), true, new TestReplicaProxy(), () -> state, logger, "test"); + + if (passesWriteConsistency) { + assertThat(op.checkWriteConsistency(), nullValue()); + op.execute(); + assertTrue("operations should have been performed, consistency level is met", + request.processedOnPrimary.get()); + } else { + assertThat(op.checkWriteConsistency(), notNullValue()); + op.execute(); + assertFalse("operations should not have been perform, consistency level is *NOT* met", + request.processedOnPrimary.get()); + assertListenerThrows("should throw exception to trigger retry", listener, UnavailableShardsException.class); + } + } + + private Set getExpectedReplicas(ShardId shardId, ClusterState state) { + Set expectedReplicas = new HashSet<>(); + String localNodeId = state.nodes().getLocalNodeId(); + if (state.routingTable().hasIndex(shardId.getIndexName())) { + for (ShardRouting shardRouting : state.routingTable().shardRoutingTable(shardId)) { + if (shardRouting.unassigned()) { + continue; + } + if (localNodeId.equals(shardRouting.currentNodeId()) == false) { + expectedReplicas.add(shardRouting); + } + + if (shardRouting.relocating() && localNodeId.equals(shardRouting.relocatingNodeId()) == false) { + expectedReplicas.add(shardRouting.buildTargetRelocatingShard()); + } + } + } + return expectedReplicas; + } + + + public static class Request extends ReplicationRequest { + public AtomicBoolean processedOnPrimary = new AtomicBoolean(); + public Set processedOnReplicas = ConcurrentCollections.newConcurrentSet(); + + public Request() { + } + + Request(ShardId shardId) { + this(); + this.shardId = shardId; + this.index = shardId.getIndexName(); + // keep things simple + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + } + } + + static class Response extends ReplicationResponse { + } + + static class TestPrimary implements ReplicationOperation.Primary { + final ShardRouting routing; + final long term; + + TestPrimary(ShardRouting routing, long term) { + this.routing = routing; + this.term = term; + } + + @Override + public ShardRouting routingEntry() { + return routing; + } + + @Override + public void failShard(String message, Throwable throwable) { + throw new AssertionError("should shouldn't be failed with [" + message + "]", throwable); + } + + @Override + public Tuple perform(Request request) throws Exception { + if (request.processedOnPrimary.compareAndSet(false, true) == false) { + fail("processed [" + request + "] twice"); + } + request.primaryTerm(term); + return new Tuple<>(new Response(), request); + } + } + + static class TestReplicaProxy implements ReplicationOperation.Replicas { + + final Map opFailures; + + final Set failedReplicas = ConcurrentCollections.newConcurrentSet(); + + TestReplicaProxy() { + this(Collections.emptyMap()); + } + + TestReplicaProxy(Map opFailures) { + this.opFailures = opFailures; + } + + @Override + public void performOn(ShardRouting replica, Request request, ActionListener listener) { + assertTrue("replica request processed twice on [" + replica + "]", request.processedOnReplicas.add(replica)); + if (opFailures.containsKey(replica)) { + listener.onFailure(opFailures.get(replica)); + } else { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + } + + @Override + public void failShard(ShardRouting replica, ShardRouting primary, String message, Throwable throwable, Runnable onSuccess, + Consumer onPrimaryDemoted, Consumer onIgnoredFailure) { + if (failedReplicas.add(replica) == false) { + fail("replica [" + replica + "] was failed twice"); + } + if (opFailures.containsKey(replica)) { + if (randomBoolean()) { + onSuccess.run(); + } else { + onIgnoredFailure.accept(new ElasticsearchException("simulated")); + } + } else { + fail("replica [" + replica + "] was failed"); + } + } + } + + class TestReplicationOperation extends ReplicationOperation { + public TestReplicationOperation(Request request, Primary primary, ActionListener listener, + Replicas replicas, Supplier clusterStateSupplier) { + this(request, primary, listener, true, false, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test"); + } + + public TestReplicationOperation(Request request, Primary primary, ActionListener listener, + boolean executeOnReplicas, boolean checkWriteConsistency, Replicas replicas, + Supplier clusterStateSupplier, ESLogger logger, String opType) { + super(request, primary, listener, executeOnReplicas, checkWriteConsistency, replicas, clusterStateSupplier, logger, opType); + } + } + + void assertListenerThrows(String msg, PlainActionFuture listener, Class klass) throws InterruptedException { + try { + listener.get(); + fail(msg); + } catch (ExecutionException ex) { + assertThat(ex.getCause(), instanceOf(klass)); + } + } + +} diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 2efc51f0d83..a10ce35ca41 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -18,16 +18,14 @@ */ package org.elasticsearch.action.support.replication; -import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.WriteConsistencyLevel; -import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -38,9 +36,9 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; @@ -48,20 +46,22 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.EngineClosedException; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; -import org.elasticsearch.index.shard.IndexShardNotStartedException; -import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; @@ -72,35 +72,29 @@ import org.junit.Before; import org.junit.BeforeClass; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.stream.Collectors; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.elasticsearch.cluster.service.ClusterServiceUtils.createClusterService; import static org.elasticsearch.cluster.service.ClusterServiceUtils.setState; import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.arrayWithSize; -import static org.hamcrest.Matchers.either; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TransportReplicationActionTests extends ESTestCase { @@ -130,7 +124,6 @@ public class TransportReplicationActionTests extends ESTestCase { transportService.start(); transportService.acceptIncomingRequests(); action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool); - count.set(1); } @After @@ -185,12 +178,13 @@ public class TransportReplicationActionTests extends ESTestCase { block = ClusterBlocks.builder() .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL)); setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block)); - assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener, ClusterBlockException.class); + assertListenerThrows("primary phase should fail operation when moving from a retryable block to a non-retryable one", listener, + ClusterBlockException.class); assertIndexShardUninitialized(); } public void assertIndexShardUninitialized() { - assertEquals(1, count.get()); + assertEquals(0, count.get()); } public void testNotStartedPrimary() throws InterruptedException, ExecutionException { @@ -227,7 +221,7 @@ public class TransportReplicationActionTests extends ESTestCase { assertThat(capturedRequests, notNullValue()); assertThat(capturedRequests.size(), equalTo(1)); assertThat(capturedRequests.get(0).action, equalTo("testAction[p]")); - assertIndexShardCounter(1); + assertIndexShardCounter(0); } /** @@ -262,7 +256,8 @@ public class TransportReplicationActionTests extends ESTestCase { assertFalse("cluster state too old didn't cause a retry", listener.isDone()); // finish relocation - ShardRouting relocationTarget = clusterService.state().getRoutingTable().shardRoutingTable(shardId).shardsWithState(ShardRoutingState.INITIALIZING).get(0); + ShardRouting relocationTarget = clusterService.state().getRoutingTable().shardRoutingTable(shardId) + .shardsWithState(ShardRoutingState.INITIALIZING).get(0); AllocationService allocationService = ESAllocationTestCase.createAllocationService(); RoutingAllocation.Result result = allocationService.applyStartedShards(state, Arrays.asList(relocationTarget)); ClusterState updatedState = ClusterState.builder(clusterService.state()).routingResult(result).build(); @@ -277,7 +272,7 @@ public class TransportReplicationActionTests extends ESTestCase { assertThat(capturedRequests, notNullValue()); assertThat(capturedRequests.size(), equalTo(1)); assertThat(capturedRequests.get(0).action, equalTo("testAction[p]")); - assertIndexShardCounter(1); + assertIndexShardCounter(0); } public void testUnknownIndexOrShardOnReroute() throws InterruptedException { @@ -352,7 +347,7 @@ public class TransportReplicationActionTests extends ESTestCase { new IndexNotFoundException(shardId.getIndex()), new IndexShardClosedException(shardId), new EngineClosedException(shardId), - new TransportReplicationAction.RetryOnPrimaryException(shardId, "hello") + new ReplicationOperation.RetryOnPrimaryException(shardId, "hello") ); } @@ -373,7 +368,8 @@ public class TransportReplicationActionTests extends ESTestCase { reroutePhase.run(); assertThat(request.shardId(), equalTo(shardId)); logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId); - final List capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId); + final List capturedRequests = + transport.getCapturedRequestsByTargetNodeAndClear().get(primaryNodeId); assertThat(capturedRequests, notNullValue()); assertThat(capturedRequests.size(), equalTo(1)); if (clusterService.state().nodes().getLocalNodeId().equals(primaryNodeId)) { @@ -386,7 +382,7 @@ public class TransportReplicationActionTests extends ESTestCase { assertIndexShardUninitialized(); } - public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throws InterruptedException, ExecutionException { + public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); @@ -394,36 +390,51 @@ public class TransportReplicationActionTests extends ESTestCase { Request request = new Request(shardId).timeout("1ms"); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - AtomicBoolean movedToReplication = new AtomicBoolean(); - TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener)) { + AtomicBoolean executed = new AtomicBoolean(); + Action.PrimaryOperationTransportHandler primaryPhase = action.new PrimaryOperationTransportHandler() { @Override - void finishAndMoveToReplication(TransportReplicationAction.ReplicationPhase replicationPhase) { - super.finishAndMoveToReplication(replicationPhase); - movedToReplication.set(true); + protected ReplicationOperation createReplicatedOperation(Request request, ActionListener actionListener, + Action.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { + return new NoopReplicationOperation(request, actionListener) { + public void execute() throws Exception { + assertPhase(task, "primary"); + assertFalse(executed.getAndSet(true)); + super.execute(); + } + }; } }; ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); boolean executeOnPrimary = true; - if (primaryShard.relocating() && randomBoolean()) { // whether shard has been marked as relocated already (i.e. relocation completed) + // whether shard has been marked as relocated already (i.e. relocation completed) + if (primaryShard.relocating() && randomBoolean()) { isRelocated.set(true); - indexShardRouting.set(primaryShard); executeOnPrimary = false; } - primaryPhase.run(); - assertThat(request.processedOnPrimary.get(), equalTo(executeOnPrimary)); - assertThat(movedToReplication.get(), equalTo(executeOnPrimary)); - if (executeOnPrimary == false) { - final List requests = transport.capturedRequestsByTargetNode().get(primaryShard.relocatingNodeId()); + primaryPhase.messageReceived(request, createTransportChannel(listener), task); + if (executeOnPrimary) { + assertTrue(executed.get()); + assertTrue(listener.isDone()); + listener.get(); + assertPhase(task, "finished"); + } else { + assertFalse(executed.get()); + assertIndexShardCounter(0); // it should have been freed. + final List requests = + transport.capturedRequestsByTargetNode().get(primaryShard.relocatingNodeId()); assertThat(requests, notNullValue()); assertThat(requests.size(), equalTo(1)); assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("testAction[p]")); - assertPhase(task, "primary"); - } else { - assertPhase(task, either(equalTo("finished")).or(equalTo("replicating"))); + assertPhase(task, "primary_delegation"); + transport.handleResponse(requests.get(0).requestId, new Response()); + assertTrue(listener.isDone()); + listener.get(); + assertPhase(task, "finished"); } } - public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws InterruptedException, ExecutionException { + public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); ClusterState state = state(index, true, ShardRoutingState.RELOCATING); @@ -434,224 +445,132 @@ public class TransportReplicationActionTests extends ESTestCase { Request request = new Request(shardId).timeout("1ms"); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); - AtomicBoolean movedToReplication = new AtomicBoolean(); - TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener)) { + AtomicBoolean executed = new AtomicBoolean(); + Action.PrimaryOperationTransportHandler primaryPhase = action.new PrimaryOperationTransportHandler() { @Override - void finishAndMoveToReplication(TransportReplicationAction.ReplicationPhase replicationPhase) { - super.finishAndMoveToReplication(replicationPhase); - movedToReplication.set(true); + protected ReplicationOperation createReplicatedOperation(Request request, ActionListener actionListener, + Action.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { + return new NoopReplicationOperation(request, actionListener) { + public void execute() throws Exception { + assertPhase(task, "primary"); + assertFalse(executed.getAndSet(true)); + super.execute(); + } + }; } }; - primaryPhase.run(); - assertThat("request was not processed on primary relocation target", request.processedOnPrimary.get(), equalTo(true)); - assertThat(movedToReplication.get(), equalTo(true)); - assertPhase(task, "replicating"); - } - - public void testAddedReplicaAfterPrimaryOperation() { - final String index = "test"; - final ShardId shardId = new ShardId(index, "_na_", 0); - // start with no replicas - setState(clusterService, stateWithActivePrimary(index, true, 0)); - logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); - final ClusterState stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED); - ReplicationTask task = maybeTask(); - - final Action actionWithAddedReplicaAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { - @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { - final Tuple operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); - // add replicas after primary operation - setState(clusterService, stateWithAddedReplicas); - logger.debug("--> state after primary operation:\n{}", clusterService.state().prettyPrint()); - return operationOnPrimary; - } - }; - - Request request = new Request(shardId); - PlainActionFuture listener = new PlainActionFuture<>(); - TransportReplicationAction.PrimaryPhase primaryPhase = actionWithAddedReplicaAfterPrimaryOp.new PrimaryPhase(task, request, createTransportChannel(listener)); - primaryPhase.run(); - assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); - assertPhase(task, "replicating"); - Map> capturedRequestsByTargetNode = transport.getCapturedRequestsByTargetNodeAndClear(); - for (ShardRouting replica : stateWithAddedReplicas.getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards()) { - List requests = capturedRequestsByTargetNode.get(replica.currentNodeId()); - assertThat(requests, notNullValue()); - assertThat(requests.size(), equalTo(1)); - assertThat("replica request was not sent", requests.get(0).action, equalTo("testAction[r]")); - } - } - - public void testRelocatingReplicaAfterPrimaryOperation() { - final String index = "test"; - final ShardId shardId = new ShardId(index, "_na_", 0); - // start with a replica - setState(clusterService, state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED)); - logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); - final ClusterState stateWithRelocatingReplica = state(index, true, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); - - final Action actionWithRelocatingReplicasAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { - @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { - final Tuple operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); - // set replica to relocating - setState(clusterService, stateWithRelocatingReplica); - logger.debug("--> state after primary operation:\n{}", clusterService.state().prettyPrint()); - return operationOnPrimary; - } - }; - - Request request = new Request(shardId); - PlainActionFuture listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - TransportReplicationAction.PrimaryPhase primaryPhase = actionWithRelocatingReplicasAfterPrimaryOp.new PrimaryPhase( - task, request, createTransportChannel(listener)); - primaryPhase.run(); - assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); - ShardRouting relocatingReplicaShard = stateWithRelocatingReplica.getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards().get(0); - Map> capturedRequestsByTargetNode = transport.getCapturedRequestsByTargetNodeAndClear(); - assertPhase(task, "replicating"); - for (String node : new String[]{relocatingReplicaShard.currentNodeId(), relocatingReplicaShard.relocatingNodeId()}) { - List requests = capturedRequestsByTargetNode.get(node); - assertThat(requests, notNullValue()); - assertThat(requests.size(), equalTo(1)); - assertThat("replica request was not sent to replica", requests.get(0).action, equalTo("testAction[r]")); - } - } - - public void testIndexDeletedAfterPrimaryOperation() { - final String index = "test"; - final ShardId shardId = new ShardId(index, "_na_", 0); - setState(clusterService, state(index, true, ShardRoutingState.STARTED, ShardRoutingState.STARTED)); - logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); - final ClusterState stateWithDeletedIndex = state(index + "_new", true, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); - - final Action actionWithDeletedIndexAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { - @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { - final Tuple operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); - // delete index after primary op - setState(clusterService, stateWithDeletedIndex); - logger.debug("--> state after primary operation:\n{}", clusterService.state().prettyPrint()); - return operationOnPrimary; - } - }; - - Request request = new Request(shardId); - PlainActionFuture listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - TransportReplicationAction.PrimaryPhase primaryPhase = actionWithDeletedIndexAfterPrimaryOp.new PrimaryPhase( - task, request, createTransportChannel(listener)); - primaryPhase.run(); - assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); - assertThat("replication phase should be skipped if index gets deleted after primary operation", transport.capturedRequestsByTargetNode().size(), equalTo(0)); + primaryPhase.messageReceived(request, createTransportChannel(listener), task); + assertThat(executed.get(), equalTo(true)); assertPhase(task, "finished"); } - public void testWriteConsistency() throws ExecutionException, InterruptedException { - action = new ActionWithConsistency(Settings.EMPTY, "testActionWithConsistency", transportService, clusterService, threadPool); - final String index = "test"; - final ShardId shardId = new ShardId(index, "_na_", 0); - final int assignedReplicas = randomInt(2); - final int unassignedReplicas = randomInt(2); - final int totalShards = 1 + assignedReplicas + unassignedReplicas; - final boolean passesWriteConsistency; - Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values())); - switch (request.consistencyLevel()) { - case ONE: - passesWriteConsistency = true; - break; - case DEFAULT: - case QUORUM: - if (totalShards <= 2) { - passesWriteConsistency = true; // primary is enough - } else { - passesWriteConsistency = assignedReplicas + 1 >= (totalShards / 2) + 1; - } - break; - case ALL: - passesWriteConsistency = unassignedReplicas == 0; - break; - default: - throw new RuntimeException("unknown consistency level [" + request.consistencyLevel() + "]"); - } - ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; - for (int i = 0; i < assignedReplicas; i++) { - replicaStates[i] = randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); - } - for (int i = assignedReplicas; i < replicaStates.length; i++) { - replicaStates[i] = ShardRoutingState.UNASSIGNED; - } + public void testPrimaryReference() throws Exception { + final IndexShard shard = mock(IndexShard.class); + final long primaryTerm = 1 + randomInt(200); + when(shard.getPrimaryTerm()).thenReturn(primaryTerm); - setState(clusterService, state(index, true, ShardRoutingState.STARTED, replicaStates)); - logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]. expecting op to [{}]. using state: \n{}", - request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, passesWriteConsistency ? "succeed" : "retry", - clusterService.state().prettyPrint()); + AtomicBoolean closed = new AtomicBoolean(); + Releasable releasable = () -> { + if (closed.compareAndSet(false, true) == false) { + fail("releasable is closed twice"); + } + }; + Action.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); + final Request request = new Request(); + Tuple result = primary.perform(request); - final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); - PlainActionFuture listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener)); - if (passesWriteConsistency) { - assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard().shardId()), nullValue()); - primaryPhase.run(); - assertTrue("operations should have been performed, consistency level is met", request.processedOnPrimary.get()); - if (assignedReplicas > 0) { - assertIndexShardCounter(2); - } else { - assertIndexShardCounter(1); - } - assertPhase(task, either(equalTo("finished")).or(equalTo("replicating"))); - } else { - assertThat(primaryPhase.checkWriteConsistency(shardRoutingTable.primaryShard().shardId()), notNullValue()); - primaryPhase.run(); - assertFalse("operations should not have been perform, consistency level is *NOT* met", request.processedOnPrimary.get()); - assertListenerThrows("should throw exception to trigger retry", listener, UnavailableShardsException.class); - assertIndexShardUninitialized(); - for (int i = 0; i < replicaStates.length; i++) { - replicaStates[i] = ShardRoutingState.STARTED; - } - setState(clusterService, state(index, true, ShardRoutingState.STARTED, replicaStates)); - listener = new PlainActionFuture<>(); - primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener)); - primaryPhase.run(); - assertTrue("once the consistency level met, operation should continue", request.processedOnPrimary.get()); - assertIndexShardCounter(2); - assertPhase(task, "replicating"); - } + assertThat(result.v2().primaryTerm(), equalTo(primaryTerm)); + + final ElasticsearchException exception = new ElasticsearchException("testing"); + primary.failShard("test", exception); + + verify(shard).failShard("test", exception); + + primary.close(); + + assertTrue(closed.get()); } - public void testReplication() throws ExecutionException, InterruptedException { + public void testReplicaProxy() throws InterruptedException, ExecutionException { + Action.ReplicasProxy proxy = action.new ReplicasProxy(); final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - - ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); - ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); - if (primaryShard.relocating() && randomBoolean()) { - // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated - state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); - } + ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(3), randomInt(2)); + logger.info("using state: {}", state.prettyPrint()); setState(clusterService, state); - final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); - int assignedReplicas = 0; - int totalShards = 0; - for (ShardRouting shard : shardRoutingTable) { - totalShards++; - if (shard.primary() == false && shard.assignedToNode()) { - assignedReplicas++; - } - if (shard.relocating()) { - assignedReplicas++; - totalShards++; - } + // check that at unknown node fails + PlainActionFuture listener = new PlainActionFuture<>(); + proxy.performOn( + TestShardRouting.newShardRouting(shardId.getIndex(), shardId.id(), "NOT THERE", false, + randomFrom(ShardRoutingState.values())), + new Request(), listener); + assertTrue(listener.isDone()); + assertListenerThrows("non existent node should throw a NoNodeAvailableException", listener, NoNodeAvailableException.class); + + final IndexShardRoutingTable shardRoutings = state.routingTable().shardRoutingTable(shardId); + final ShardRouting replica = randomFrom(shardRoutings.replicaShards().stream() + .filter(ShardRouting::assignedToNode).collect(Collectors.toList())); + listener = new PlainActionFuture<>(); + proxy.performOn(replica, new Request(), listener); + assertFalse(listener.isDone()); + + CapturingTransport.CapturedRequest[] captures = transport.getCapturedRequestsAndClear(); + assertThat(captures, arrayWithSize(1)); + if (randomBoolean()) { + transport.handleResponse(captures[0].requestId, TransportResponse.Empty.INSTANCE); + assertTrue(listener.isDone()); + listener.get(); + } else if (randomBoolean()) { + transport.handleRemoteError(captures[0].requestId, new ElasticsearchException("simulated")); + assertTrue(listener.isDone()); + assertListenerThrows("listener should reflect remote error", listener, ElasticsearchException.class); + } else { + transport.handleError(captures[0].requestId, new TransportException("simulated")); + assertTrue(listener.isDone()); + assertListenerThrows("listener should reflect remote error", listener, TransportException.class); } - runReplicateTest(state, shardRoutingTable, assignedReplicas, totalShards); + AtomicReference failure = new AtomicReference<>(); + AtomicReference ignoredFailure = new AtomicReference<>(); + AtomicBoolean success = new AtomicBoolean(); + proxy.failShard(replica, shardRoutings.primaryShard(), "test", new ElasticsearchException("simulated"), + () -> success.set(true), failure::set, ignoredFailure::set + ); + CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); + assertEquals(1, shardFailedRequests.length); + CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; + ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry) shardFailedRequest.request; + // the shard the request was sent to and the shard to be failed should be the same + assertEquals(shardRoutingEntry.getShardRouting(), replica); + if (randomBoolean()) { + // simulate success + transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); + assertTrue(success.get()); + assertNull(failure.get()); + assertNull(ignoredFailure.get()); + + } else if (randomBoolean()) { + // simulate the primary has been demoted + transport.handleRemoteError(shardFailedRequest.requestId, + new ShardStateAction.NoLongerPrimaryShardException(shardRoutingEntry.getShardRouting().shardId(), + "shard-failed-test")); + assertFalse(success.get()); + assertNotNull(failure.get()); + assertNull(ignoredFailure.get()); + + } else { + // simulated an "ignored" exception + transport.handleRemoteError(shardFailedRequest.requestId, + new NodeClosedException(state.nodes().getLocalNode())); + assertFalse(success.get()); + assertNull(failure.get()); + assertNotNull(ignoredFailure.get()); + } } - public void testReplicationWithShadowIndex() throws ExecutionException, InterruptedException { + public void testShadowIndexDisablesReplication() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); @@ -661,431 +580,125 @@ public class TransportReplicationActionTests extends ESTestCase { settings.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true); metaData.put(IndexMetaData.builder(metaData.get(index)).settings(settings)); state = ClusterState.builder(state).metaData(metaData).build(); - - ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); - if (primaryShard.relocating() && randomBoolean()) { - // simulate execution of the primary phase on the relocation target node - state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); - } setState(clusterService, state); - - final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); - int assignedReplicas = 0; - int totalShards = 0; - for (ShardRouting shard : shardRoutingTable) { - totalShards++; - if (shard.primary() && shard.relocating()) { - assignedReplicas++; - totalShards++; + Action.PrimaryOperationTransportHandler primaryPhase = action.new PrimaryOperationTransportHandler() { + @Override + protected ReplicationOperation createReplicatedOperation(Request request, ActionListener actionListener, + Action.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { + assertFalse(executeOnReplicas); + return new NoopReplicationOperation(request, actionListener); } - } - runReplicateTest(state, shardRoutingTable, assignedReplicas, totalShards); + }; + primaryPhase.messageReceived(new Request(shardId), createTransportChannel(new PlainActionFuture<>()), null); } - protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shardRoutingTable, int assignedReplicas, int totalShards) throws InterruptedException, ExecutionException { - final ShardIterator shardIt = shardRoutingTable.shardsIt(); - final ShardId shardId = shardIt.shardId(); - final Request request = new Request(shardId); - final long primaryTerm = randomInt(200); - request.primaryTerm(primaryTerm); - final PlainActionFuture listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint()); - - TransportReplicationAction.IndexShardReference reference = getOrCreateIndexShardOperationsCounter(0); - - ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); - indexShardRouting.set(primaryShard); - - assertIndexShardCounter(2); - AtomicReference error = new AtomicReference<>(); - - TransportChannel channel = createTransportChannel(listener, error::set); - TransportReplicationAction.ReplicationPhase replicationPhase = - action.new ReplicationPhase(task, request, new Response(), request.shardId(), channel, reference); - - assertThat(replicationPhase.totalShards(), equalTo(totalShards)); - assertThat(replicationPhase.pending(), equalTo(assignedReplicas)); - replicationPhase.run(); - final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - assertPhase(task, either(equalTo("finished")).or(equalTo("replicating"))); - - HashMap nodesSentTo = new HashMap<>(); - boolean executeOnReplica = - action.shouldExecuteReplication(clusterService.state().getMetaData().index(shardId.getIndex()).getSettings()); - for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) { - // no duplicate requests - Request replicationRequest = (Request) capturedRequest.request; - assertNull(nodesSentTo.put(capturedRequest.node.getId(), replicationRequest)); - // the request is hitting the correct shard - assertEquals(request.shardId, replicationRequest.shardId); - } - - String localNodeId = clusterService.state().getNodes().getLocalNodeId(); - // no request was sent to the local node - assertThat(nodesSentTo.keySet(), not(hasItem(localNodeId))); - - // requests were sent to the correct shard copies - for (ShardRouting shard : clusterService.state().getRoutingTable().shardRoutingTable(shardId)) { - if (shard.primary() == false && executeOnReplica == false) { - continue; - } - if (shard.unassigned()) { - continue; - } - if (localNodeId.equals(shard.currentNodeId()) == false) { - assertThat(nodesSentTo.remove(shard.currentNodeId()), notNullValue()); - } - if (shard.relocating() && localNodeId.equals(shard.relocatingNodeId()) == false) { // for relocating primaries, we replicate from target to source if source is marked as relocated - assertThat(nodesSentTo.remove(shard.relocatingNodeId()), notNullValue()); - } - } - - assertThat(nodesSentTo.entrySet(), is(empty())); - - if (assignedReplicas > 0) { - assertThat("listener is done, but there are outstanding replicas", listener.isDone(), equalTo(false)); - } - int pending = replicationPhase.pending(); - int criticalFailures = 0; // failures that should fail the shard - int successful = 1; - List failures = new ArrayList<>(); - for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) { - if (randomBoolean()) { - Throwable t; - boolean criticalFailure = randomBoolean(); - if (criticalFailure) { - t = new CorruptIndexException("simulated", (String) null); - criticalFailures++; - } else { - t = new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING); - } - logger.debug("--> simulating failure on {} with [{}]", capturedRequest.node, t.getClass().getSimpleName()); - transport.handleRemoteError(capturedRequest.requestId, t); - if (criticalFailure) { - CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); - assertEquals(1, shardFailedRequests.length); - CapturingTransport.CapturedRequest shardFailedRequest = shardFailedRequests[0]; - // get the shard the request was sent to - ShardRouting routing = clusterService.state().getRoutingNodes().node(capturedRequest.node.getId()).get(request.shardId.id()); - // and the shard that was requested to be failed - ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry) shardFailedRequest.request; - // the shard the request was sent to and the shard to be failed should be the same - assertEquals(shardRoutingEntry.getShardRouting(), routing); - failures.add(shardFailedRequest); - int ternary = randomIntBetween(0, 2); - if (ternary == 0) { - // simulate master left and test that the shard failure is retried - int numberOfRetries = randomIntBetween(1, 4); - CapturingTransport.CapturedRequest currentRequest = shardFailedRequest; - for (int retryNumber = 0; retryNumber < numberOfRetries; retryNumber++) { - // force a new cluster state to simulate a new master having been elected - setState(clusterService, ClusterState.builder(clusterService.state())); - transport.handleRemoteError(currentRequest.requestId, new NotMasterException("shard-failed-test")); - CapturingTransport.CapturedRequest[] retryRequests = transport.getCapturedRequestsAndClear(); - assertEquals(1, retryRequests.length); - currentRequest = retryRequests[0]; - } - // now simulate that the last retry succeeded - transport.handleResponse(currentRequest.requestId, TransportResponse.Empty.INSTANCE); - } else if (ternary == 1) { - // simulate the primary has been demoted - transport.handleRemoteError(shardFailedRequest.requestId, new ShardStateAction.NoLongerPrimaryShardException(shardRoutingEntry.getShardRouting().shardId(), "shard-failed-test")); - // the primary should fail itself - assertShardIsFailed(); - // we should see a retry on primary exception - assertNotNull(error.get()); - assertThat(error.get(), instanceOf(TransportReplicationAction.RetryOnPrimaryException.class)); - return; - } else if (ternary == 2) { - transport.handleResponse(shardFailedRequest.requestId, TransportResponse.Empty.INSTANCE); - } else { - assert false; - } - } - } else { - successful++; - transport.handleResponse(capturedRequest.requestId, TransportResponse.Empty.INSTANCE); - } - pending--; - assertThat(replicationPhase.pending(), equalTo(pending)); - assertThat(replicationPhase.successful(), equalTo(successful)); - } - assertThat(listener.isDone(), equalTo(true)); - Response response = listener.get(); - final ReplicationResponse.ShardInfo shardInfo = response.getShardInfo(); - assertThat(shardInfo.getFailed(), equalTo(criticalFailures)); - assertThat(shardInfo.getFailures(), arrayWithSize(criticalFailures)); - assertThat(shardInfo.getSuccessful(), equalTo(successful)); - assertThat(shardInfo.getTotal(), equalTo(totalShards)); - - assertThat("failed to see enough shard failures", failures.size(), equalTo(criticalFailures)); - for (CapturingTransport.CapturedRequest capturedRequest : transport.capturedRequests()) { - assertThat(capturedRequest.action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME)); - } - // all replicas have responded so the counter should be decreased again - assertIndexShardCounter(1); - - // assert that nothing in the replica logic changes the primary term of the operation - assertThat(request.primaryTerm(), equalTo(primaryTerm)); - } - public void testCounterOnPrimary() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); // no replica, we only want to test on primary setState(clusterService, state(index, true, ShardRoutingState.STARTED)); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); - Request request = new Request(shardId).timeout("100ms"); + Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); + int i = randomInt(3); + final boolean throwExceptionOnCreation = i == 1; + final boolean throwExceptionOnRun = i == 2; + final boolean respondWithError = i == 3; + Action.PrimaryOperationTransportHandler primaryPhase = action.new PrimaryOperationTransportHandler() { - /** - * Execute an action that is stuck in shard operation until a latch is counted down. - * That way we can start the operation, check if the counter was incremented and then unblock the operation - * again to see if the counter is decremented afterwards. - * TODO: I could also write an action that asserts that the counter is 2 in the shard operation. - * However, this failure would only become apparent once listener.get is called. Seems a little implicit. - * */ - action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); - final TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener)); - Thread t = new Thread() { @Override - public void run() { - primaryPhase.run(); + protected ReplicationOperation createReplicatedOperation(Request request, ActionListener listener, + Action.PrimaryShardReference primaryShardReference, + boolean executeOnReplicas) { + assertIndexShardCounter(1); + if (throwExceptionOnCreation) { + throw new ElasticsearchException("simulated exception, during createReplicatedOperation"); + } + return new NoopReplicationOperation(request, listener) { + @Override + public void execute() throws Exception { + assertIndexShardCounter(1); + assertPhase(task, "primary"); + if (throwExceptionOnRun) { + throw new ElasticsearchException("simulated exception, during performOnPrimary"); + } else if (respondWithError) { + this.finalResponseListener.onFailure(new ElasticsearchException("simulated exception, as a response")); + } else { + super.execute(); + } + } + }; } }; - t.start(); - // shard operation should be ongoing, so the counter is at 2 - // we have to wait here because increment happens in thread - assertBusy(() -> assertIndexShardCounter(2)); - - assertThat(transport.capturedRequests().length, equalTo(0)); - ((ActionWithDelay) action).countDownLatch.countDown(); - t.join(); - listener.get(); - // operation finished, counter back to 0 - assertIndexShardCounter(1); - assertThat(transport.capturedRequests().length, equalTo(0)); - assertPhase(task, "finished"); - } - - public void testCounterIncrementedWhileReplicationOngoing() throws InterruptedException, ExecutionException, IOException { - final String index = "test"; - final ShardId shardId = new ShardId(index, "_na_", 0); - // one replica to make sure replication is attempted - setState(clusterService, state(index, true, - ShardRoutingState.STARTED, ShardRoutingState.STARTED)); - ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard(); - indexShardRouting.set(primaryShard); - logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); - Request request = new Request(shardId).timeout("100ms"); - PlainActionFuture listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - - TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener)); - primaryPhase.run(); - assertIndexShardCounter(2); - assertThat(transport.capturedRequests().length, equalTo(1)); - // try once with successful response - transport.handleResponse(transport.capturedRequests()[0].requestId, TransportResponse.Empty.INSTANCE); - transport.clear(); - assertIndexShardCounter(1); + try { + primaryPhase.messageReceived(request, createTransportChannel(listener), task); + } catch (ElasticsearchException e) { + if (throwExceptionOnCreation || throwExceptionOnRun) { + assertThat(e.getMessage(), containsString("simulated")); + assertIndexShardCounter(0); + return; // early terminate + } else { + throw e; + } + } + assertIndexShardCounter(0); + assertTrue(listener.isDone()); assertPhase(task, "finished"); - request = new Request(shardId).timeout("100ms"); - task = maybeTask(); - primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener)); - primaryPhase.run(); - assertIndexShardCounter(2); - assertPhase(task, "replicating"); - CapturingTransport.CapturedRequest[] replicationRequests = transport.getCapturedRequestsAndClear(); - assertThat(replicationRequests.length, equalTo(1)); - // try with failure response - transport.handleRemoteError(replicationRequests[0].requestId, new CorruptIndexException("simulated", (String) null)); - CapturingTransport.CapturedRequest[] shardFailedRequests = transport.getCapturedRequestsAndClear(); - assertEquals(1, shardFailedRequests.length); - transport.handleResponse(shardFailedRequests[0].requestId, TransportResponse.Empty.INSTANCE); - assertIndexShardCounter(1); + try { + listener.get(); + } catch (ExecutionException e) { + if (respondWithError) { + Throwable cause = e.getCause(); + assertThat(cause, instanceOf(ElasticsearchException.class)); + assertThat(cause.getMessage(), containsString("simulated")); + } else { + throw e; + } + } } public void testReplicasCounter() throws Exception { final ShardId shardId = new ShardId("test", "_na_", 0); setState(clusterService, state(shardId.getIndexName(), true, ShardRoutingState.STARTED, ShardRoutingState.STARTED)); - action = new ActionWithDelay(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); - final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); + boolean throwException = randomBoolean(); final ReplicationTask task = maybeTask(); - Thread t = new Thread() { + Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool) { @Override - public void run() { - try { - replicaOperationTransportHandler.messageReceived(new Request().setShardId(shardId), createTransportChannel(new PlainActionFuture<>()), task); - } catch (Exception e) { - logger.error("Failed", e); + protected void shardOperationOnReplica(Request request) { + assertIndexShardCounter(1); + assertPhase(task, "replica"); + if (throwException) { + throw new ElasticsearchException("simulated"); } + super.shardOperationOnReplica(request); } }; - t.start(); - // shard operation should be ongoing, so the counter is at 2 - // we have to wait here because increment happens in thread - assertBusy(() -> assertIndexShardCounter(2)); - ((ActionWithDelay) action).countDownLatch.countDown(); - t.join(); + final Action.ReplicaOperationTransportHandler replicaOperationTransportHandler = action.new ReplicaOperationTransportHandler(); + try { + replicaOperationTransportHandler.messageReceived(new Request().setShardId(shardId), + createTransportChannel(new PlainActionFuture<>()), task); + } catch (ElasticsearchException e) { + assertThat(e.getMessage(), containsString("simulated")); + assertTrue(throwException); + } assertPhase(task, "finished"); // operation should have finished and counter decreased because no outstanding replica requests - assertIndexShardCounter(1); - // now check if this also works if operation throws exception - action = new ActionWithExceptions(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); - final Action.ReplicaOperationTransportHandler replicaOperationTransportHandlerForException = action.new ReplicaOperationTransportHandler(); - try { - replicaOperationTransportHandlerForException.messageReceived(new Request(shardId), createTransportChannel(new PlainActionFuture<>()), task); - fail(); - } catch (Throwable t2) { - } - assertIndexShardCounter(1); - } - - public void testCounterDecrementedIfShardOperationThrowsException() throws InterruptedException, ExecutionException, IOException { - action = new ActionWithExceptions(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, threadPool); - final String index = "test"; - final ShardId shardId = new ShardId(index, "_na_", 0); - setState(clusterService, state(index, true, - ShardRoutingState.STARTED, ShardRoutingState.STARTED)); - logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); - Request request = new Request(shardId).timeout("100ms"); - PlainActionFuture listener = new PlainActionFuture<>(); - ReplicationTask task = maybeTask(); - - TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(task, request, createTransportChannel(listener)); - primaryPhase.run(); - // no replica request should have been sent yet - assertThat(transport.capturedRequests().length, equalTo(0)); - // no matter if the operation is retried or not, counter must be be back to 1 - assertIndexShardCounter(1); - assertPhase(task, "failed"); - } - - public void testReroutePhaseRetriedAfterDemotedPrimary() { - final String index = "test"; - final ShardId shardId = new ShardId(index, "_na_", 0); - boolean localPrimary = true; - setState(clusterService, state(index, localPrimary, - ShardRoutingState.STARTED, ShardRoutingState.STARTED)); - Action action = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { - @Override - protected void resolveRequest(MetaData metaData, String concreteIndex, Request request) { - request.setShardId(shardId); - } - }; - Request request = new Request(); - PlainActionFuture listener = new PlainActionFuture<>(); - - TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener); - reroutePhase.run(); - - // reroute phase should send primary action - CapturingTransport.CapturedRequest[] primaryRequests = transport.getCapturedRequestsAndClear(); - assertThat(primaryRequests.length, equalTo(1)); - assertThat(primaryRequests[0].action, equalTo("testAction" + (localPrimary ? "[p]" : ""))); - AtomicReference error = new AtomicReference<>(); - TransportChannel channel = createTransportChannel(listener, error::set); - - // simulate primary action - TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(maybeTask(), request, channel); - primaryPhase.run(); - - // primary action should send replica request - CapturingTransport.CapturedRequest[] replicaRequests = transport.getCapturedRequestsAndClear(); - assertThat(replicaRequests.length, equalTo(1)); - assertThat(replicaRequests[0].action, equalTo("testAction[r]")); - indexShardRouting.set(clusterService.state().getRoutingTable().shardRoutingTable(shardId).primaryShard()); - - // simulate replica failure - transport.handleRemoteError(replicaRequests[0].requestId, new Exception("exception")); - - // the primary should request replica failure - CapturingTransport.CapturedRequest[] replicaFailures = transport.getCapturedRequestsAndClear(); - assertThat(replicaFailures.length, equalTo(1)); - assertThat(replicaFailures[0].action, equalTo(ShardStateAction.SHARD_FAILED_ACTION_NAME)); - - // simulate demoted primary - transport.handleRemoteError(replicaFailures[0].requestId, new ShardStateAction.NoLongerPrimaryShardException(shardId, "demoted")); - assertTrue(isShardFailed.get()); - assertTrue(listener.isDone()); - assertNotNull(error.get()); - assertThat(error.get(), instanceOf(TransportReplicationAction.RetryOnPrimaryException.class)); - assertThat(error.get().getMessage(), containsString("was demoted while failing replica shard")); - - // reroute phase sees the retry - transport.handleRemoteError(primaryRequests[0].requestId, error.get()); - - // publish a new cluster state - boolean localPrimaryOnRetry = randomBoolean(); - setState(clusterService, state(index, localPrimaryOnRetry, - ShardRoutingState.STARTED, ShardRoutingState.STARTED)); - CapturingTransport.CapturedRequest[] primaryRetry = transport.getCapturedRequestsAndClear(); - - // the request should be retried - assertThat(primaryRetry.length, equalTo(1)); - assertThat(primaryRetry[0].action, equalTo("testAction" + (localPrimaryOnRetry ? "[p]" : ""))); + assertIndexShardCounter(0); } private void assertIndexShardCounter(int expected) { assertThat(count.get(), equalTo(expected)); } - private void assertShardIsFailed() { - assertTrue(isShardFailed.get()); - } - private final AtomicInteger count = new AtomicInteger(0); private final AtomicBoolean isRelocated = new AtomicBoolean(false); - private final AtomicBoolean isShardFailed = new AtomicBoolean(); - - private final AtomicReference indexShardRouting = new AtomicReference<>(); - - /** - * Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run. - */ - private synchronized TransportReplicationAction.IndexShardReference getOrCreateIndexShardOperationsCounter(long primaryTerm) { - count.incrementAndGet(); - return new TransportReplicationAction.IndexShardReference() { - @Override - public boolean isRelocated() { - return isRelocated.get(); - } - - @Override - public void failShard(String reason, @Nullable Throwable e) { - isShardFailed.set(true); - if (randomBoolean()) { - throw new ElasticsearchException("simulated"); - } - } - - @Override - public ShardRouting routingEntry() { - ShardRouting shardRouting = indexShardRouting.get(); - assert shardRouting != null; - return shardRouting; - } - - @Override - public long opPrimaryTerm() { - return primaryTerm; - } - - @Override - public void close() { - count.decrementAndGet(); - } - }; - } - /** * Sometimes build a ReplicationTask for tracking the phase of the * TransportReplicationAction. Since TransportReplicationAction has to work @@ -1144,7 +757,8 @@ public class TransportReplicationActionTests extends ESTestCase { ThreadPool threadPool) { super(settings, actionName, transportService, clusterService, null, threadPool, new ShardStateAction(settings, clusterService, transportService, null, null, threadPool), - new ActionFilters(new HashSet()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new, Request::new, ThreadPool.Names.SAME); + new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), + Request::new, Request::new, ThreadPool.Names.SAME); } @Override @@ -1153,7 +767,7 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { + protected Tuple shardOperationOnPrimary(Request shardRequest) throws Exception { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; return new Tuple<>(new Response(), shardRequest); @@ -1174,96 +788,52 @@ public class TransportReplicationActionTests extends ESTestCase { return false; } - @Override - protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId, Request request) { - final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex()); - return getOrCreateIndexShardOperationsCounter(indexMetaData.primaryTerm(shardId.id())); - } - - protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long opPrimaryTerm) { - return getOrCreateIndexShardOperationsCounter(opPrimaryTerm); - } - } - - class ActionWithConsistency extends Action { - - ActionWithConsistency(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) { - super(settings, actionName, transportService, clusterService, threadPool); - } - - @Override - protected boolean checkWriteConsistency() { - return true; - } - } - - /** - * Throws exceptions when executed. Used for testing if the counter is correctly decremented in case an operation fails. - */ - class ActionWithExceptions extends Action { - - ActionWithExceptions(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) throws IOException { - super(settings, actionName, transportService, clusterService, threadPool); - } - - @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) { - return throwException(shardRequest.shardId()); - } - - private Tuple throwException(ShardId shardId) { - try { - if (randomBoolean()) { - // throw a generic exception - // for testing on replica this will actually cause an NPE because it will make the shard fail but - // for this we need an IndicesService which is null. - throw new ElasticsearchException("simulated"); - } else { - // throw an exception which will cause retry on primary and be ignored on replica - throw new IndexShardNotStartedException(shardId, IndexShardState.RECOVERING); + protected PrimaryShardReference getPrimaryShardReference(ShardId shardId) { + count.incrementAndGet(); + return new PrimaryShardReference(null, null) { + @Override + public boolean isRelocated() { + return isRelocated.get(); } - } catch (Exception e) { - logger.info("throwing ", e); - throw e; - } + + @Override + public void failShard(String reason, @Nullable Throwable e) { + throw new UnsupportedOperationException(); + } + + @Override + public ShardRouting routingEntry() { + ShardRouting shardRouting = clusterService.state().getRoutingTable() + .shardRoutingTable(shardId).primaryShard(); + assert shardRouting != null; + return shardRouting; + } + + @Override + public void close() { + count.decrementAndGet(); + } + + }; } - @Override - protected void shardOperationOnReplica(Request shardRequest) { - throwException(shardRequest.shardId()); + protected Releasable acquireReplicaOperationLock(ShardId shardId, long primaryTerm) { + count.incrementAndGet(); + return count::decrementAndGet; } } - /** - * Delays the operation until countDownLatch is counted down - */ - class ActionWithDelay extends Action { - CountDownLatch countDownLatch = new CountDownLatch(1); + class NoopReplicationOperation extends ReplicationOperation { - ActionWithDelay(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) throws IOException { - super(settings, actionName, transportService, clusterService, threadPool); + public NoopReplicationOperation(Request request, ActionListener listener) { + super(request, null, listener, true, true, null, null, TransportReplicationActionTests.this.logger, "noop"); } @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { - awaitLatch(); - return new Tuple<>(new Response(), shardRequest); + public void execute() throws Exception { + this.finalResponseListener.onResponse(new Response()); } - - private void awaitLatch() throws InterruptedException { - countDownLatch.await(); - countDownLatch = new CountDownLatch(1); - } - - @Override - protected void shardOperationOnReplica(Request shardRequest) { - try { - awaitLatch(); - } catch (InterruptedException e) { - } - } - } /** diff --git a/core/src/test/java/org/elasticsearch/index/mapper/timestamp/TimestampMappingTests.java b/core/src/test/java/org/elasticsearch/index/mapper/timestamp/TimestampMappingTests.java index a828d7850f2..b562a9f19b4 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/timestamp/TimestampMappingTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/timestamp/TimestampMappingTests.java @@ -132,32 +132,6 @@ public class TimestampMappingTests extends ESSingleNodeTestCase { assertThat(disabledMapper.timestampFieldMapper().enabled(), is(false)); } - // Issue 4718: was throwing a TimestampParsingException: failed to parse timestamp [null] - public void testTimestampDefaultValue() throws Exception { - XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject("type") - .startObject("_timestamp") - .field("enabled", "yes") - .endObject() - .endObject().endObject(); - XContentBuilder doc = XContentFactory.jsonBuilder() - .startObject() - .field("foo", "bar") - .endObject(); - - MetaData metaData = MetaData.builder().build(); - DocumentMapper docMapper = createIndex("test").mapperService().documentMapperParser().parse("type", new CompressedXContent(mapping.string())); - - MappingMetaData mappingMetaData = new MappingMetaData(docMapper); - - IndexRequest request = new IndexRequest("test", "type", "1").source(doc); - request.process(metaData, mappingMetaData, true, "test"); - assertThat(request.timestamp(), notNullValue()); - - // We should have less than one minute (probably some ms) - long delay = System.currentTimeMillis() - Long.parseLong(request.timestamp()); - assertThat(delay, lessThanOrEqualTo(60000L)); - } - // Issue 4718: was throwing a TimestampParsingException: failed to parse timestamp [null] public void testTimestampMissingDefaultToEpochValue() throws Exception { XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject("type") @@ -178,7 +152,7 @@ public class TimestampMappingTests extends ESSingleNodeTestCase { MappingMetaData mappingMetaData = new MappingMetaData(docMapper); IndexRequest request = new IndexRequest("test", "type", "1").source(doc); - request.process(metaData, mappingMetaData, true, "test"); + request.process(mappingMetaData, true, "test"); assertThat(request.timestamp(), notNullValue()); assertThat(request.timestamp(), is(MappingMetaData.Timestamp.parseStringTimestamp("1970-01-01", Joda.forPattern("YYYY-MM-dd")))); } @@ -203,7 +177,7 @@ public class TimestampMappingTests extends ESSingleNodeTestCase { MappingMetaData mappingMetaData = new MappingMetaData(docMapper); IndexRequest request = new IndexRequest("test", "type", "1").source(doc); - request.process(metaData, mappingMetaData, true, "test"); + request.process(mappingMetaData, true, "test"); assertThat(request.timestamp(), notNullValue()); // We should have less than one minute (probably some ms) @@ -281,7 +255,7 @@ public class TimestampMappingTests extends ESSingleNodeTestCase { MappingMetaData mappingMetaData = new MappingMetaData(docMapper); IndexRequest request = new IndexRequest("test", "type", "1").source(doc); - request.process(metaData, mappingMetaData, true, "test"); + request.process(mappingMetaData, true, "test"); assertThat(request.timestamp(), notNullValue()); @@ -407,7 +381,7 @@ public class TimestampMappingTests extends ESSingleNodeTestCase { XContentBuilder doc = XContentFactory.jsonBuilder().startObject().endObject(); IndexRequest request = new IndexRequest("test", "type", "1").source(doc).timestamp("2015060210"); MappingMetaData mappingMetaData = new MappingMetaData(docMapper); - request.process(metaData, mappingMetaData, true, "test"); + request.process(mappingMetaData, true, "test"); assertThat(request.timestamp(), is("1433239200000")); } @@ -419,16 +393,15 @@ public class TimestampMappingTests extends ESSingleNodeTestCase { BytesReference source = XContentFactory.jsonBuilder().startObject().field("field", "value").endObject().bytes(); // test with 2.x DocumentMapper currentMapper = createIndex("new-index").mapperService().documentMapperParser().parse("type", new CompressedXContent(mapping)); - MetaData newMetaData = client().admin().cluster().prepareState().get().getState().getMetaData(); // this works with 2.x IndexRequest request = new IndexRequest("new-index", "type", "1").source(source).timestamp("1970-01-01"); - request.process(newMetaData, new MappingMetaData(currentMapper), true, "new-index"); + request.process(new MappingMetaData(currentMapper), true, "new-index"); // this fails with 2.x request = new IndexRequest("new-index", "type", "1").source(source).timestamp("1234567890"); try { - request.process(newMetaData, new MappingMetaData(currentMapper), true, "new-index"); + request.process(new MappingMetaData(currentMapper), true, "new-index"); } catch (Exception e) { assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); assertThat(e.getMessage(), containsString("failed to parse timestamp [1234567890]")); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index be0dd053c3d..f21973d19db 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -46,7 +46,6 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; @@ -1071,7 +1070,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { // we can't issue this request through a client because of the inconsistencies we created with the cluster state // doing it directly instead IndexRequest request = client().prepareIndex("test", "test", "0").setSource("{}").request(); - request.process(MetaData.builder().put(test.getMetaData(), false).build(), null, false, "test"); + request.process(null, false, "test"); TransportIndexAction.executeIndexRequestOnPrimary(request, newShard, null); newShard.refresh("test"); assertHitCount(client().prepareSearch().get(), 1);