From a3a49a12ef72782512b2ad36d5c1770747599ddd Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 27 Jan 2016 08:51:18 -0500 Subject: [PATCH] Illegal shard failure requests Today, shard failure requests are blindly handled on the master without any validation that the request is a legal request. A legal request is a shard failure request for which the shard requesting the failure is either the local allocation or the primary allocation. This is because shard failure requests are classified into only two sets: requests that correspond to shards that exist, and requests that correspond to shards that do not exist. Requests that correspond to shards that do not exist are immediately marked as successful (there is nothing to do), and requests that correspond to shards that do exist are sent to the allocation service for handling the failure. This pull request adds a third classification for shard failure requests to separate out illegal shard failure requests and enables the master to validate shard failure requests. The master communicates the illegality of a shard failure request via a new exception: NoLongerPrimaryShardException. This exception can be used by shard failure listeners to discover when they've sent a shard failure request that they were not allowed to send (e.g., if they are no longer the primary allocation for the shard). Closes #16275 --- .../elasticsearch/ElasticsearchException.java | 4 +- .../TransportReplicationAction.java | 36 +++--- .../cluster/ClusterStateTaskExecutor.java | 5 + .../action/shard/ShardStateAction.java | 104 +++++++++++---- .../cluster/routing/RoutingTable.java | 8 ++ .../cluster/IndicesClusterStateService.java | 40 +++--- .../ExceptionSerializationTests.java | 9 ++ .../TransportReplicationActionTests.java | 14 +- ...rdFailedClusterStateTaskExecutorTests.java | 122 ++++++++++++++---- .../action/shard/ShardStateActionTests.java | 63 ++++++--- .../DiscoveryWithServiceDisruptionsIT.java | 4 +- 11 files changed, 302 insertions(+), 107 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index e6dc7deff2b..dbbe98633ae 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -19,6 +19,7 @@ package org.elasticsearch; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -613,7 +614,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte RETRY_ON_REPLICA_EXCEPTION(org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnReplicaException.class, org.elasticsearch.action.support.replication.TransportReplicationAction.RetryOnReplicaException::new, 136), TYPE_MISSING_EXCEPTION(org.elasticsearch.indices.TypeMissingException.class, org.elasticsearch.indices.TypeMissingException::new, 137), FAILED_TO_COMMIT_CLUSTER_STATE_EXCEPTION(org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException::new, 140), - QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class, org.elasticsearch.index.query.QueryShardException::new, 141); + QUERY_SHARD_EXCEPTION(org.elasticsearch.index.query.QueryShardException.class, org.elasticsearch.index.query.QueryShardException::new, 141), + NO_LONGER_PRIMARY_SHARD_EXCEPTION(ShardStateAction.NoLongerPrimaryShardException.class, ShardStateAction.NoLongerPrimaryShardException::new, 142); final Class exceptionClass; final FunctionThatThrowsIOException constructor; diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index cd7d2871e7f..7f29954762d 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -778,16 +778,15 @@ public abstract class TransportReplicationAction shards; private final DiscoveryNodes nodes; private final boolean executeOnReplica; - private final String indexUUID; private final AtomicBoolean finished = new AtomicBoolean(); private final AtomicInteger success = new AtomicInteger(1); // We already wrote into the primary shard private final ConcurrentMap shardReplicaFailures = ConcurrentCollections.newConcurrentMap(); private final AtomicInteger pending; private final int totalShards; - private final Releasable indexShardReference; + private final IndexShardReference indexShardReference; public ReplicationPhase(ReplicaRequest replicaRequest, Response finalResponse, ShardId shardId, - TransportChannel channel, Releasable indexShardReference) { + TransportChannel channel, IndexShardReference indexShardReference) { this.replicaRequest = replicaRequest; this.channel = channel; this.finalResponse = finalResponse; @@ -804,7 +803,6 @@ public abstract class TransportReplicationAction { return this == SUCCESS; } + public Throwable getFailure() { + assert !isSuccess(); + return failure; + } + /** * Handle the execution result with the provided consumers * @param onSuccess handler to invoke on success diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 4aca9a4e235..fa703881bb2 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.action.shard; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; @@ -28,8 +29,9 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.MasterNodeChangePredicate; import org.elasticsearch.cluster.NotMasterException; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; @@ -46,6 +48,7 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; @@ -60,6 +63,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -125,17 +129,22 @@ public class ShardStateAction extends AbstractComponent { return ExceptionsHelper.unwrap(exp, MASTER_CHANNEL_EXCEPTIONS) != null; } - public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) { + /** + * Send a shard failed request to the master node to update the + * cluster state. + * + * @param shardRouting the shard to fail + * @param sourceShardRouting the source shard requesting the failure (must be the shard itself, or the primary shard) + * @param message the reason for the failure + * @param failure the underlying cause of the failure + * @param listener callback upon completion of the request + */ + public void shardFailed(final ShardRouting shardRouting, ShardRouting sourceShardRouting, final String message, @Nullable final Throwable failure, Listener listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); - ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure); + ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, sourceShardRouting, message, failure); sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardRoutingEntry, listener); } - public void resendShardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) { - logger.trace("{} re-sending failed shard [{}], index UUID [{}], reason [{}]", shardRouting.shardId(), failure, shardRouting, indexUUID, message); - shardFailed(shardRouting, indexUUID, message, failure, listener); - } - // visible for testing protected void waitForNewMasterAndRetry(String actionName, ClusterStateObserver observer, ShardRoutingEntry shardRoutingEntry, Listener listener) { observer.waitForNextChange(new ClusterStateObserver.Listener() { @@ -231,15 +240,15 @@ public class ShardStateAction extends AbstractComponent { // partition tasks into those that correspond to shards // that exist versus do not exist - Map> partition = - tasks.stream().collect(Collectors.partitioningBy(task -> shardExists(currentState, task))); + Map> partition = + tasks.stream().collect(Collectors.groupingBy(task -> validateTask(currentState, task))); // tasks that correspond to non-existent shards are marked // as successful - batchResultBuilder.successes(partition.get(false)); + batchResultBuilder.successes(partition.getOrDefault(ValidationResult.SHARD_MISSING, Collections.emptyList())); ClusterState maybeUpdatedState = currentState; - List tasksToFail = partition.get(true); + List tasksToFail = partition.getOrDefault(ValidationResult.VALID, Collections.emptyList()); try { List failedShards = tasksToFail @@ -257,6 +266,15 @@ public class ShardStateAction extends AbstractComponent { batchResultBuilder.failures(tasksToFail, t); } + partition + .getOrDefault(ValidationResult.SOURCE_INVALID, Collections.emptyList()) + .forEach(task -> batchResultBuilder.failure( + task, + new NoLongerPrimaryShardException( + task.getShardRouting().shardId(), + "source shard [" + task.sourceShardRouting + "] is neither the local allocation nor the primary allocation") + )); + return batchResultBuilder.build(maybeUpdatedState); } @@ -265,17 +283,36 @@ public class ShardStateAction extends AbstractComponent { return allocationService.applyFailedShards(currentState, failedShards); } - private boolean shardExists(ClusterState currentState, ShardRoutingEntry task) { + private enum ValidationResult { + VALID, + SOURCE_INVALID, + SHARD_MISSING + } + + private ValidationResult validateTask(ClusterState currentState, ShardRoutingEntry task) { + + // non-local requests + if (!task.shardRouting.isSameAllocation(task.sourceShardRouting)) { + IndexShardRoutingTable indexShard = currentState.getRoutingTable().shardRoutingTableOrNull(task.shardRouting.shardId()); + if (indexShard == null) { + return ValidationResult.SOURCE_INVALID; + } + ShardRouting primaryShard = indexShard.primaryShard(); + if (primaryShard == null || !primaryShard.isSameAllocation(task.sourceShardRouting)) { + return ValidationResult.SOURCE_INVALID; + } + } + RoutingNodes.RoutingNodeIterator routingNodeIterator = currentState.getRoutingNodes().routingNodeIter(task.getShardRouting().currentNodeId()); if (routingNodeIterator != null) { for (ShardRouting maybe : routingNodeIterator) { if (task.getShardRouting().isSameAllocation(maybe)) { - return true; + return ValidationResult.VALID; } } } - return false; + return ValidationResult.SHARD_MISSING; } @Override @@ -291,9 +328,9 @@ public class ShardStateAction extends AbstractComponent { } } - public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String message, Listener listener) { + public void shardStarted(final ShardRouting shardRouting, final String message, Listener listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); - ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, null); + ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, shardRouting, message, null); sendShardAction(SHARD_STARTED_ACTION_NAME, observer, shardRoutingEntry, listener); } @@ -360,16 +397,16 @@ public class ShardStateAction extends AbstractComponent { public static class ShardRoutingEntry extends TransportRequest { ShardRouting shardRouting; - String indexUUID = IndexMetaData.INDEX_UUID_NA_VALUE; + ShardRouting sourceShardRouting; String message; Throwable failure; public ShardRoutingEntry() { } - ShardRoutingEntry(ShardRouting shardRouting, String indexUUID, String message, @Nullable Throwable failure) { + ShardRoutingEntry(ShardRouting shardRouting, ShardRouting sourceShardRouting, String message, @Nullable Throwable failure) { this.shardRouting = shardRouting; - this.indexUUID = indexUUID; + this.sourceShardRouting = sourceShardRouting; this.message = message; this.failure = failure; } @@ -382,7 +419,7 @@ public class ShardStateAction extends AbstractComponent { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); shardRouting = readShardRoutingEntry(in); - indexUUID = in.readString(); + sourceShardRouting = readShardRoutingEntry(in); message = in.readString(); failure = in.readThrowable(); } @@ -391,18 +428,25 @@ public class ShardStateAction extends AbstractComponent { public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardRouting.writeTo(out); - out.writeString(indexUUID); + sourceShardRouting.writeTo(out); out.writeString(message); out.writeThrowable(failure); } @Override public String toString() { - return "" + shardRouting + ", indexUUID [" + indexUUID + "], message [" + message + "], failure [" + ExceptionsHelper.detailedMessage(failure) + "]"; + return String.format( + Locale.ROOT, + "failed shard [%s], source shard [%s], message [%s], failure [%s]", + shardRouting, + sourceShardRouting, + message, + ExceptionsHelper.detailedMessage(failure)); } } public interface Listener { + default void onSuccess() { } @@ -423,6 +467,20 @@ public class ShardStateAction extends AbstractComponent { */ default void onFailure(final Throwable t) { } + + } + + public static class NoLongerPrimaryShardException extends ElasticsearchException { + + public NoLongerPrimaryShardException(ShardId shardId, String msg) { + super(msg); + setShard(shardId); + } + + public NoLongerPrimaryShardException(StreamInput in) throws IOException { + super(in); + } + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 6d81556eb2c..58e3ed6b644 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Predicate; /** @@ -137,6 +138,13 @@ public class RoutingTable implements Iterable, Diffable Optional.ofNullable(irt.shard(shardId.getId()))) + .orElse(null); + } + public RoutingTable validateRaiseException(MetaData metaData) throws RoutingValidationException { RoutingTableValidation validation = validate(metaData); if (!validation.valid()) { diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index f5afec1d5e3..bd94fb8a123 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -306,7 +306,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { try { if (indexShard.recoverFromStore(nodes.localNode())) { - shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store", SHARD_STATE_ACTION_LISTENER); + shardStateAction.shardStarted(shardRouting, "after recovery from store", SHARD_STATE_ACTION_LISTENER); } } catch (Throwable t) { handleRecoveryFailure(indexService, shardRouting, true, t); @@ -634,7 +636,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { synchronized (mutex) { - failAndRemoveShard(shardRouting, shardFailure.indexUUID, indexService, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause); + failAndRemoveShard(shardRouting, indexService, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause); } }); } diff --git a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java index 50764eef65e..0b693aed56b 100644 --- a/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/ExceptionSerializationTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.TimestampParsingException; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.AbstractClientHeadersTestCase; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -591,7 +592,14 @@ public class ExceptionSerializationTests extends ESTestCase { assertEquals("foo", e.getHeader("foo").get(0)); assertEquals("bar", e.getHeader("foo").get(1)); assertSame(status, e.status()); + } + public void testNoLongerPrimaryShardException() throws IOException { + ShardId shardId = new ShardId(new Index(randomAsciiOfLength(4), randomAsciiOfLength(4)), randomIntBetween(0, Integer.MAX_VALUE)); + String msg = randomAsciiOfLength(4); + ShardStateAction.NoLongerPrimaryShardException ex = serialize(new ShardStateAction.NoLongerPrimaryShardException(shardId, msg)); + assertEquals(shardId, ex.getShardId()); + assertEquals(msg, ex.getMessage()); } public static class UnknownHeaderException extends ElasticsearchException { @@ -776,6 +784,7 @@ public class ExceptionSerializationTests extends ESTestCase { ids.put(139, null); ids.put(140, org.elasticsearch.discovery.Discovery.FailedToCommitClusterStateException.class); ids.put(141, org.elasticsearch.index.query.QueryShardException.class); + ids.put(142, ShardStateAction.NoLongerPrimaryShardException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index ce9343309a6..2e4e3cb475b 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -550,7 +550,7 @@ public class TransportReplicationActionTests extends ESTestCase { } } - runReplicateTest(shardRoutingTable, assignedReplicas, totalShards); + runReplicateTest(state, shardRoutingTable, assignedReplicas, totalShards); } public void testReplicationWithShadowIndex() throws ExecutionException, InterruptedException { @@ -581,18 +581,22 @@ public class TransportReplicationActionTests extends ESTestCase { totalShards++; } } - runReplicateTest(shardRoutingTable, assignedReplicas, totalShards); + runReplicateTest(state, shardRoutingTable, assignedReplicas, totalShards); } - protected void runReplicateTest(IndexShardRoutingTable shardRoutingTable, int assignedReplicas, int totalShards) throws InterruptedException, ExecutionException { + protected void runReplicateTest(ClusterState state, IndexShardRoutingTable shardRoutingTable, int assignedReplicas, int totalShards) throws InterruptedException, ExecutionException { final ShardIterator shardIt = shardRoutingTable.shardsIt(); final ShardId shardId = shardIt.shardId(); final Request request = new Request(shardId); final PlainActionFuture listener = new PlainActionFuture<>(); logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint()); - Releasable reference = getOrCreateIndexShardOperationsCounter(); + TransportReplicationAction.IndexShardReference reference = getOrCreateIndexShardOperationsCounter(); + + ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + indexShardRouting.set(primaryShard); + assertIndexShardCounter(2); // TODO: set a default timeout TransportReplicationAction.ReplicationPhase replicationPhase = @@ -755,6 +759,8 @@ public class TransportReplicationActionTests extends ESTestCase { // one replica to make sure replication is attempted clusterService.setState(state(index, true, ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard(); + indexShardRouting.set(primaryShard); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); Request request = new Request(shardId).timeout("100ms"); PlainActionFuture listener = new PlainActionFuture<>(); diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java index 4e8d1d9266d..3ad8d5013b2 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.action.shard; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; @@ -28,6 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; @@ -38,6 +40,9 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.DiscoveryService; +import org.elasticsearch.index.Index; import org.elasticsearch.test.ESAllocationTestCase; import org.junit.Before; @@ -45,12 +50,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.not; public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCase { @@ -119,9 +127,25 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa tasks.addAll(failingTasks); tasks.addAll(nonExistentTasks); ClusterStateTaskExecutor.BatchResult result = failingExecutor.execute(currentState, tasks); - Map taskResultMap = - failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> false)); - taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> true))); + Map taskResultMap = + failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.failure(new RuntimeException("simulated applyFailedShards failure")))); + taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success()))); + assertTaskResults(taskResultMap, result, currentState, false); + } + + public void testIllegalShardFailureRequests() throws Exception { + String reason = "test illegal shard failure requests"; + ClusterState currentState = createClusterStateWithStartedShards(reason); + List failingTasks = createExistingShards(currentState, reason); + List tasks = new ArrayList<>(); + for (ShardStateAction.ShardRoutingEntry failingTask : failingTasks) { + tasks.add(new ShardStateAction.ShardRoutingEntry(failingTask.getShardRouting(), randomInvalidSourceShard(currentState, failingTask.getShardRouting()), failingTask.message, failingTask.failure)); + } + Map taskResultMap = + tasks.stream().collect(Collectors.toMap( + Function.identity(), + task -> ClusterStateTaskExecutor.TaskResult.failure(new ShardStateAction.NoLongerPrimaryShardException(task.getShardRouting().shardId(), "source shard [" + task.sourceShardRouting + "] is neither the local allocation nor the primary allocation")))); + ClusterStateTaskExecutor.BatchResult result = executor.execute(currentState, tasks); assertTaskResults(taskResultMap, result, currentState, false); } @@ -156,17 +180,22 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa for (int i = 0; i < numberOfTasks; i++) { shardsToFail.add(randomFrom(failures)); } - return toTasks(shardsToFail, indexUUID, reason); + return toTasks(currentState, shardsToFail, indexUUID, reason); } private List createNonExistentShards(ClusterState currentState, String reason) { // add shards from a non-existent index - MetaData nonExistentMetaData = - MetaData.builder() - .put(IndexMetaData.builder("non-existent").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(numberOfReplicas)) - .build(); - RoutingTable routingTable = RoutingTable.builder().addAsNew(nonExistentMetaData.index("non-existent")).build(); - String nonExistentIndexUUID = nonExistentMetaData.index("non-existent").getIndexUUID(); + String nonExistentIndexUUID = "non-existent"; + Index index = new Index("non-existent", nonExistentIndexUUID); + List nodeIds = new ArrayList<>(); + for (ObjectCursor nodeId : currentState.nodes().getNodes().keys()) { + nodeIds.add(nodeId.toString()); + } + List nonExistentShards = new ArrayList<>(); + nonExistentShards.add(nonExistentShardRouting(index, nodeIds, true)); + for (int i = 0; i < numberOfReplicas; i++) { + nonExistentShards.add(nonExistentShardRouting(index, nodeIds, false)); + } List existingShards = createExistingShards(currentState, reason); List shardsWithMismatchedAllocationIds = new ArrayList<>(); @@ -174,28 +203,32 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa ShardRouting sr = existingShard.getShardRouting(); ShardRouting nonExistentShardRouting = TestShardRouting.newShardRouting(sr.index(), sr.id(), sr.currentNodeId(), sr.relocatingNodeId(), sr.restoreSource(), sr.primary(), sr.state(), sr.version()); - shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardRoutingEntry(nonExistentShardRouting, existingShard.indexUUID, existingShard.message, existingShard.failure)); + shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardRoutingEntry(nonExistentShardRouting, nonExistentShardRouting, existingShard.message, existingShard.failure)); } List tasks = new ArrayList<>(); - tasks.addAll(toTasks(routingTable.allShards(), nonExistentIndexUUID, reason)); + nonExistentShards.forEach(shard -> tasks.add(new ShardStateAction.ShardRoutingEntry(shard, shard, reason, new CorruptIndexException("simulated", nonExistentIndexUUID)))); tasks.addAll(shardsWithMismatchedAllocationIds); return tasks; } + private ShardRouting nonExistentShardRouting(Index index, List nodeIds, boolean primary) { + return TestShardRouting.newShardRouting(index, 0, randomFrom(nodeIds), primary, randomFrom(ShardRoutingState.INITIALIZING, ShardRoutingState.RELOCATING, ShardRoutingState.STARTED), randomIntBetween(1, 8)); + } + private static void assertTasksSuccessful( List tasks, ClusterStateTaskExecutor.BatchResult result, ClusterState clusterState, boolean clusterStateChanged ) { - Map taskResultMap = - tasks.stream().collect(Collectors.toMap(Function.identity(), task -> true)); + Map taskResultMap = + tasks.stream().collect(Collectors.toMap(Function.identity(), task -> ClusterStateTaskExecutor.TaskResult.success())); assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged); } private static void assertTaskResults( - Map taskResultMap, + Map taskResultMap, ClusterStateTaskExecutor.BatchResult result, ClusterState clusterState, boolean clusterStateChanged @@ -203,24 +236,29 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa // there should be as many task results as tasks assertEquals(taskResultMap.size(), result.executionResults.size()); - for (Map.Entry entry : taskResultMap.entrySet()) { + for (Map.Entry entry : taskResultMap.entrySet()) { // every task should have a corresponding task result assertTrue(result.executionResults.containsKey(entry.getKey())); // the task results are as expected - assertEquals(entry.getValue(), result.executionResults.get(entry.getKey()).isSuccess()); + assertEquals(entry.getValue().isSuccess(), result.executionResults.get(entry.getKey()).isSuccess()); } - // every shard that we requested to be successfully failed is - // gone List shards = clusterState.getRoutingTable().allShards(); - for (Map.Entry entry : taskResultMap.entrySet()) { - if (entry.getValue()) { + for (Map.Entry entry : taskResultMap.entrySet()) { + if (entry.getValue().isSuccess()) { + // the shard was successfully failed and so should not + // be in the routing table for (ShardRouting shard : shards) { if (entry.getKey().getShardRouting().allocationId() != null) { assertThat(shard.allocationId(), not(equalTo(entry.getKey().getShardRouting().allocationId()))); } } + } else { + // check we saw the expected failure + ClusterStateTaskExecutor.TaskResult actualResult = result.executionResults.get(entry.getKey()); + assertThat(actualResult.getFailure(), instanceOf(entry.getValue().getFailure().getClass())); + assertThat(actualResult.getFailure().getMessage(), equalTo(entry.getValue().getFailure().getMessage())); } } @@ -231,11 +269,49 @@ public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCa } } - private static List toTasks(List shards, String indexUUID, String message) { + private static List toTasks(ClusterState currentState, List shards, String indexUUID, String message) { return shards .stream() - .map(shard -> new ShardStateAction.ShardRoutingEntry(shard, indexUUID, message, new CorruptIndexException("simulated", indexUUID))) + .map(shard -> new ShardStateAction.ShardRoutingEntry(shard, randomValidSourceShard(currentState, shard), message, new CorruptIndexException("simulated", indexUUID))) .collect(Collectors.toList()); } + private static ShardRouting randomValidSourceShard(ClusterState currentState, ShardRouting shardRouting) { + // for the request node ID to be valid, either the request is + // from the node the shard is assigned to, or the request is + // from the node holding the primary shard + if (randomBoolean()) { + // request from local node + return shardRouting; + } else { + // request from primary node unless in the case of + // non-existent shards there is not one and we fallback to + // the local node + ShardRouting primaryNodeId = primaryShard(currentState, shardRouting); + return primaryNodeId != null ? primaryNodeId : shardRouting; + } + } + + private static ShardRouting randomInvalidSourceShard(ClusterState currentState, ShardRouting shardRouting) { + ShardRouting primaryShard = primaryShard(currentState, shardRouting); + Set shards = + currentState + .routingTable() + .allShards() + .stream() + .filter(shard -> !shard.isSameAllocation(shardRouting)) + .filter(shard -> !shard.isSameAllocation(primaryShard)) + .collect(Collectors.toSet()); + if (!shards.isEmpty()) { + return randomSubsetOf(1, shards.toArray(new ShardRouting[0])).get(0); + } else { + return + TestShardRouting.newShardRouting(shardRouting.index(), shardRouting.id(), DiscoveryService.generateNodeId(Settings.EMPTY), randomBoolean(), randomFrom(ShardRoutingState.values()), shardRouting.version()); + } + } + + private static ShardRouting primaryShard(ClusterState currentState, ShardRouting shardRouting) { + IndexShardRoutingTable indexShard = currentState.getRoutingTable().shardRoutingTableOrNull(shardRouting.shardId()); + return indexShard == null ? null : indexShard.primaryShard(); + } } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index b6e69b27a5a..62f32e20fec 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -30,7 +30,9 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; @@ -128,13 +130,11 @@ public class ShardStateActionTests extends ESTestCase { clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); - String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - AtomicBoolean success = new AtomicBoolean(); CountDownLatch latch = new CountDownLatch(1); ShardRouting shardRouting = getRandomShardRouting(index); - shardStateAction.shardFailed(shardRouting, indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.shardFailed(shardRouting, shardRouting, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { success.set(true); @@ -174,15 +174,14 @@ public class ShardStateActionTests extends ESTestCase { noMasterBuilder.masterNodeId(null); clusterService.setState(ClusterState.builder(clusterService.state()).nodes(noMasterBuilder)); - String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - CountDownLatch latch = new CountDownLatch(1); AtomicInteger retries = new AtomicInteger(); AtomicBoolean success = new AtomicBoolean(); setUpMasterRetryVerification(1, retries, latch, requestId -> {}); - shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + ShardRouting failedShard = getRandomShardRouting(index); + shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { success.set(true); @@ -208,8 +207,6 @@ public class ShardStateActionTests extends ESTestCase { clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); - String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - CountDownLatch latch = new CountDownLatch(1); AtomicInteger retries = new AtomicInteger(); AtomicBoolean success = new AtomicBoolean(); @@ -232,7 +229,8 @@ public class ShardStateActionTests extends ESTestCase { final int numberOfRetries = randomIntBetween(1, 256); setUpMasterRetryVerification(numberOfRetries, retries, latch, retryLoop); - shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + ShardRouting failedShard = getRandomShardRouting(index); + shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { success.set(true); @@ -265,11 +263,10 @@ public class ShardStateActionTests extends ESTestCase { clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); - String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - AtomicBoolean failure = new AtomicBoolean(); - shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + ShardRouting failedShard = getRandomShardRouting(index); + shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { failure.set(false); @@ -295,15 +292,13 @@ public class ShardStateActionTests extends ESTestCase { clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); - String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - AtomicBoolean success = new AtomicBoolean(); CountDownLatch latch = new CountDownLatch(1); ShardRouting failedShard = getRandomShardRouting(index); RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build(); clusterService.setState(ClusterState.builder(clusterService.state()).routingTable(routingTable)); - shardStateAction.shardFailed(failedShard, indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.shardFailed(failedShard, failedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override public void onSuccess() { success.set(true); @@ -325,6 +320,44 @@ public class ShardStateActionTests extends ESTestCase { assertTrue(success.get()); } + public void testNoLongerPrimaryShardException() throws InterruptedException { + final String index = "test"; + + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); + + ShardRouting failedShard = getRandomShardRouting(index); + + String nodeId = randomFrom(clusterService.state().nodes().nodes().keys().toArray(String.class)); + + AtomicReference failure = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + ShardRouting sourceFailedShard = TestShardRouting.newShardRouting(failedShard.index(), failedShard.id(), nodeId, randomBoolean(), randomFrom(ShardRoutingState.values()), failedShard.version()); + shardStateAction.shardFailed(failedShard, sourceFailedShard, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + failure.set(null); + latch.countDown(); + } + + @Override + public void onFailure(Throwable t) { + failure.set(t); + latch.countDown(); + } + }); + + ShardStateAction.NoLongerPrimaryShardException catastrophicError = + new ShardStateAction.NoLongerPrimaryShardException(failedShard.shardId(), "source shard [" + sourceFailedShard + " is neither the local allocation nor the primary allocation"); + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + transport.handleRemoteError(capturedRequests[0].requestId, catastrophicError); + + latch.await(); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(ShardStateAction.NoLongerPrimaryShardException.class)); + assertThat(failure.get().getMessage(), equalTo(catastrophicError.getMessage())); + } + private ShardRouting getRandomShardRouting(String index) { IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt(); diff --git a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java index c282f3ef183..739e07df4a9 100644 --- a/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptionsIT.java @@ -56,7 +56,6 @@ import org.elasticsearch.discovery.zen.ping.ZenPing; import org.elasticsearch.discovery.zen.ping.ZenPingService; import org.elasticsearch.discovery.zen.ping.unicast.UnicastZenPing; import org.elasticsearch.discovery.zen.publish.PublishClusterStateAction; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.store.IndicesStoreIntegrationIT; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -905,7 +904,6 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { ShardRouting failedShard = randomFrom(clusterService().state().getRoutingNodes().node(nonMasterNodeId).shardsWithState(ShardRoutingState.STARTED)); ShardStateAction service = internalCluster().getInstance(ShardStateAction.class, nonMasterNode); - String indexUUID = clusterService().state().metaData().index("test").getIndexUUID(); CountDownLatch latch = new CountDownLatch(1); AtomicBoolean success = new AtomicBoolean(); @@ -913,7 +911,7 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase { NetworkPartition networkPartition = addRandomIsolation(isolatedNode); networkPartition.startDisrupting(); - service.shardFailed(failedShard, indexUUID, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() { + service.shardFailed(failedShard, failedShard, "simulated", new CorruptIndexException("simulated", (String) null), new ShardStateAction.Listener() { @Override public void onSuccess() { success.set(true);