From 392814ea6f5f62f5f1d2232645db92da01d59914 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 14 Jan 2016 18:40:11 -0500 Subject: [PATCH] Shard failure requests for non-existent shards This commit adds handling on the master side for shard failure requests for shards that do not exist at the time that they are processed on the master node (whether it be from errant requests, duplicate requests, or both the primary and replica notifying the master of a shard failure). This change is made because such shard failure requests should always be considered successful (the failed shard is not there anymore), but could be marked as failed if batched with a shard failure request that does in fact fail. This avoids the possibility of an unexpected catastrophic failure while applying the failed shards from causing such a request to also be marked as failed setting in motion additional failures. Closes #16089 --- .../cluster/ClusterStateTaskExecutor.java | 2 +- .../shard/NoOpShardStateActionListener.java | 23 -- .../action/shard/ShardStateAction.java | 54 +++- .../cluster/IndicesClusterStateService.java | 4 +- ...rdFailedClusterStateTaskExecutorTests.java | 241 ++++++++++++++++++ .../action/shard/ShardStateActionTests.java | 41 ++- 6 files changed, 325 insertions(+), 40 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/cluster/action/shard/NoOpShardStateActionListener.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/action/shard/ShardFailedClusterStateTaskExecutorTests.java diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index fb22c2ca368..e5d3f06f1ec 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -120,7 +120,7 @@ public interface ClusterStateTaskExecutor { } public boolean isSuccess() { - return failure != null; + return this == SUCCESS; } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/NoOpShardStateActionListener.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/NoOpShardStateActionListener.java deleted file mode 100644 index ed0a7f56b9c..00000000000 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/NoOpShardStateActionListener.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.action.shard; - -public class NoOpShardStateActionListener implements ShardStateAction.Listener { -} diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 170d6fa0899..276edc9b23d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.MasterNodeChangePredicate; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -61,6 +62,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; @@ -209,12 +212,12 @@ public class ShardStateAction extends AbstractComponent { } } - private static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { + static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { private final AllocationService allocationService; private final RoutingService routingService; private final ESLogger logger; - public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) { + ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) { this.allocationService = allocationService; this.routingService = routingService; this.logger = logger; @@ -223,23 +226,56 @@ public class ShardStateAction extends AbstractComponent { @Override public BatchResult execute(ClusterState currentState, List tasks) throws Exception { BatchResult.Builder batchResultBuilder = BatchResult.builder(); - List failedShards = new ArrayList<>(tasks.size()); - for (ShardRoutingEntry task : tasks) { - failedShards.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)); - } + + // partition tasks into those that correspond to shards + // that exist versus do not exist + Map> partition = + tasks.stream().collect(Collectors.partitioningBy(task -> shardExists(currentState, task))); + + // tasks that correspond to non-existent shards are marked + // as successful + batchResultBuilder.successes(partition.get(false)); + ClusterState maybeUpdatedState = currentState; + List tasksToFail = partition.get(true); try { - RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, failedShards); + List failedShards = + tasksToFail + .stream() + .map(task -> new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)) + .collect(Collectors.toList()); + RoutingAllocation.Result result = applyFailedShards(currentState, failedShards); if (result.changed()) { maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build(); } - batchResultBuilder.successes(tasks); + batchResultBuilder.successes(tasksToFail); } catch (Throwable t) { - batchResultBuilder.failures(tasks, t); + // failures are communicated back to the requester + // cluster state will not be updated in this case + batchResultBuilder.failures(tasksToFail, t); } + return batchResultBuilder.build(maybeUpdatedState); } + // visible for testing + RoutingAllocation.Result applyFailedShards(ClusterState currentState, List failedShards) { + return allocationService.applyFailedShards(currentState, failedShards); + } + + private boolean shardExists(ClusterState currentState, ShardRoutingEntry task) { + RoutingNodes.RoutingNodeIterator routingNodeIterator = + currentState.getRoutingNodes().routingNodeIter(task.getShardRouting().currentNodeId()); + if (routingNodeIterator != null) { + for (ShardRouting maybe : routingNodeIterator) { + if (task.getShardRouting().isSameAllocation(maybe)) { + return true; + } + } + } + return false; + } + @Override public void clusterStatePublished(ClusterState newClusterState) { int numberOfUnassignedShards = newClusterState.getRoutingNodes().unassigned().size(); diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 6052d109565..34da596646d 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; -import org.elasticsearch.cluster.action.shard.NoOpShardStateActionListener; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -92,7 +91,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent tasks = Collections.emptyList(); + ClusterStateTaskExecutor.BatchResult result = + executor.execute(clusterState, tasks); + assertTasksSuccessful(tasks, result, clusterState, false); + } + + public void testDuplicateFailuresAreOkay() throws Exception { + String reason = "test duplicate failures are okay"; + ClusterState currentState = createClusterStateWithStartedShards(reason); + List tasks = createExistingShards(currentState, reason); + ClusterStateTaskExecutor.BatchResult result = executor.execute(currentState, tasks); + assertTasksSuccessful(tasks, result, clusterState, true); + } + + public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { + String reason = "test non existent shards are marked as successful"; + ClusterState currentState = createClusterStateWithStartedShards(reason); + List tasks = createNonExistentShards(currentState, reason); + ClusterStateTaskExecutor.BatchResult result = executor.execute(clusterState, tasks); + assertTasksSuccessful(tasks, result, clusterState, false); + } + + public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Exception { + String reason = "test trivially successful tasks batched with failing tasks"; + ClusterState currentState = createClusterStateWithStartedShards(reason); + List failingTasks = createExistingShards(currentState, reason); + List nonExistentTasks = createNonExistentShards(currentState, reason); + ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) { + @Override + RoutingAllocation.Result applyFailedShards(ClusterState currentState, List failedShards) { + throw new RuntimeException("simulated applyFailedShards failure"); + } + }; + List tasks = new ArrayList<>(); + tasks.addAll(failingTasks); + tasks.addAll(nonExistentTasks); + ClusterStateTaskExecutor.BatchResult result = failingExecutor.execute(currentState, tasks); + Map taskResultMap = + failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> false)); + taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> true))); + assertTaskResults(taskResultMap, result, currentState, false); + } + + private ClusterState createClusterStateWithStartedShards(String reason) { + int numberOfNodes = 1 + numberOfReplicas; + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); + IntStream.rangeClosed(1, numberOfNodes).mapToObj(node -> newNode("node" + node)).forEach(nodes::put); + ClusterState stateAfterAddingNode = + ClusterState.builder(clusterState).nodes(nodes).build(); + RoutingTable afterReroute = + allocationService.reroute(stateAfterAddingNode, reason).routingTable(); + ClusterState stateAfterReroute = ClusterState.builder(stateAfterAddingNode).routingTable(afterReroute).build(); + RoutingNodes routingNodes = stateAfterReroute.getRoutingNodes(); + RoutingTable afterStart = + allocationService.applyStartedShards(stateAfterReroute, routingNodes.shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); + return ClusterState.builder(stateAfterReroute).routingTable(afterStart).build(); + } + + private List createExistingShards(ClusterState currentState, String reason) { + List shards = new ArrayList<>(); + GroupShardsIterator shardGroups = + currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true); + for (ShardIterator shardIt : shardGroups) { + for (ShardRouting shard : shardIt.asUnordered()) { + shards.add(shard); + } + } + List failures = randomSubsetOf(randomIntBetween(1, 1 + shards.size() / 4), shards.toArray(new ShardRouting[0])); + String indexUUID = metaData.index(INDEX).getIndexUUID(); + int numberOfTasks = randomIntBetween(failures.size(), 2 * failures.size()); + List shardsToFail = new ArrayList<>(numberOfTasks); + for (int i = 0; i < numberOfTasks; i++) { + shardsToFail.add(randomFrom(failures)); + } + return toTasks(shardsToFail, indexUUID, reason); + } + + private List createNonExistentShards(ClusterState currentState, String reason) { + // add shards from a non-existent index + MetaData nonExistentMetaData = + MetaData.builder() + .put(IndexMetaData.builder("non-existent").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(numberOfReplicas)) + .build(); + RoutingTable routingTable = RoutingTable.builder().addAsNew(nonExistentMetaData.index("non-existent")).build(); + String nonExistentIndexUUID = nonExistentMetaData.index("non-existent").getIndexUUID(); + + List existingShards = createExistingShards(currentState, reason); + List shardsWithMismatchedAllocationIds = new ArrayList<>(); + for (ShardStateAction.ShardRoutingEntry existingShard : existingShards) { + ShardRouting sr = existingShard.getShardRouting(); + ShardRouting nonExistentShardRouting = + TestShardRouting.newShardRouting(sr.index(), sr.id(), sr.currentNodeId(), sr.relocatingNodeId(), sr.restoreSource(), sr.primary(), sr.state(), sr.version()); + shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardRoutingEntry(nonExistentShardRouting, existingShard.indexUUID, existingShard.message, existingShard.failure)); + } + + List tasks = new ArrayList<>(); + tasks.addAll(toTasks(routingTable.allShards(), nonExistentIndexUUID, reason)); + tasks.addAll(shardsWithMismatchedAllocationIds); + return tasks; + } + + private static void assertTasksSuccessful( + List tasks, + ClusterStateTaskExecutor.BatchResult result, + ClusterState clusterState, + boolean clusterStateChanged + ) { + Map taskResultMap = + tasks.stream().collect(Collectors.toMap(Function.identity(), task -> true)); + assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged); + } + + private static void assertTaskResults( + Map taskResultMap, + ClusterStateTaskExecutor.BatchResult result, + ClusterState clusterState, + boolean clusterStateChanged + ) { + // there should be as many task results as tasks + assertEquals(taskResultMap.size(), result.executionResults.size()); + + for (Map.Entry entry : taskResultMap.entrySet()) { + // every task should have a corresponding task result + assertTrue(result.executionResults.containsKey(entry.getKey())); + + // the task results are as expected + assertEquals(entry.getValue(), result.executionResults.get(entry.getKey()).isSuccess()); + } + + // every shard that we requested to be successfully failed is + // gone + List shards = clusterState.getRoutingTable().allShards(); + for (Map.Entry entry : taskResultMap.entrySet()) { + if (entry.getValue()) { + for (ShardRouting shard : shards) { + if (entry.getKey().getShardRouting().allocationId() != null) { + assertThat(shard.allocationId(), not(equalTo(entry.getKey().getShardRouting().allocationId()))); + } + } + } + } + + if (clusterStateChanged) { + assertNotSame(clusterState, result.resultingState); + } else { + assertSame(clusterState, result.resultingState); + } + } + + private static List toTasks(List shards, String indexUUID, String message) { + return shards + .stream() + .map(shard -> new ShardStateAction.ShardRoutingEntry(shard, indexUUID, message, new CorruptIndexException("simulated", indexUUID))) + .collect(Collectors.toList()); + } + +} diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index c59405f2345..30d4e48551f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -27,19 +27,19 @@ import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.NodeNotConnectedException; -import org.elasticsearch.transport.RemoteTransportException; -import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -48,8 +48,6 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -293,6 +291,41 @@ public class ShardStateActionTests extends ESTestCase { assertTrue(failure.get()); } + public void testShardNotFound() throws InterruptedException { + final String index = "test"; + + clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + + String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + + AtomicBoolean success = new AtomicBoolean(); + CountDownLatch latch = new CountDownLatch(1); + + ShardRouting failedShard = getRandomShardRouting(index); + RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build(); + clusterService.setState(ClusterState.builder(clusterService.state()).routingTable(routingTable)); + shardStateAction.shardFailed(failedShard, indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + success.set(true); + latch.countDown(); + } + + @Override + public void onFailure(Throwable t) { + success.set(false); + latch.countDown(); + assert false; + } + }); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + + latch.await(); + assertTrue(success.get()); + } + private ShardRouting getRandomShardRouting(String index) { IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt();