diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java index fb22c2ca368..e5d3f06f1ec 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java @@ -120,7 +120,7 @@ public interface ClusterStateTaskExecutor { } public boolean isSuccess() { - return failure != null; + return this == SUCCESS; } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/NoOpShardStateActionListener.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/NoOpShardStateActionListener.java deleted file mode 100644 index ed0a7f56b9c..00000000000 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/NoOpShardStateActionListener.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.action.shard; - -public class NoOpShardStateActionListener implements ShardStateAction.Listener { -} diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 170d6fa0899..276edc9b23d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.MasterNodeChangePredicate; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; @@ -61,6 +62,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; @@ -209,12 +212,12 @@ public class ShardStateAction extends AbstractComponent { } } - private static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { + static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor { private final AllocationService allocationService; private final RoutingService routingService; private final ESLogger logger; - public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) { + ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) { this.allocationService = allocationService; this.routingService = routingService; this.logger = logger; @@ -223,23 +226,56 @@ public class ShardStateAction extends AbstractComponent { @Override public BatchResult execute(ClusterState currentState, List tasks) throws Exception { BatchResult.Builder batchResultBuilder = BatchResult.builder(); - List failedShards = new ArrayList<>(tasks.size()); - for (ShardRoutingEntry task : tasks) { - failedShards.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)); - } + + // partition tasks into those that correspond to shards + // that exist versus do not exist + Map> partition = + tasks.stream().collect(Collectors.partitioningBy(task -> shardExists(currentState, task))); + + // tasks that correspond to non-existent shards are marked + // as successful + batchResultBuilder.successes(partition.get(false)); + ClusterState maybeUpdatedState = currentState; + List tasksToFail = partition.get(true); try { - RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, failedShards); + List failedShards = + tasksToFail + .stream() + .map(task -> new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)) + .collect(Collectors.toList()); + RoutingAllocation.Result result = applyFailedShards(currentState, failedShards); if (result.changed()) { maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build(); } - batchResultBuilder.successes(tasks); + batchResultBuilder.successes(tasksToFail); } catch (Throwable t) { - batchResultBuilder.failures(tasks, t); + // failures are communicated back to the requester + // cluster state will not be updated in this case + batchResultBuilder.failures(tasksToFail, t); } + return batchResultBuilder.build(maybeUpdatedState); } + // visible for testing + RoutingAllocation.Result applyFailedShards(ClusterState currentState, List failedShards) { + return allocationService.applyFailedShards(currentState, failedShards); + } + + private boolean shardExists(ClusterState currentState, ShardRoutingEntry task) { + RoutingNodes.RoutingNodeIterator routingNodeIterator = + currentState.getRoutingNodes().routingNodeIter(task.getShardRouting().currentNodeId()); + if (routingNodeIterator != null) { + for (ShardRouting maybe : routingNodeIterator) { + if (task.getShardRouting().isSameAllocation(maybe)) { + return true; + } + } + } + return false; + } + @Override public void clusterStatePublished(ClusterState newClusterState) { int numberOfUnassignedShards = newClusterState.getRoutingNodes().unassigned().size(); diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 6052d109565..34da596646d 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; -import org.elasticsearch.cluster.action.shard.NoOpShardStateActionListener; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -92,7 +91,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent tasks = Collections.emptyList(); + ClusterStateTaskExecutor.BatchResult result = + executor.execute(clusterState, tasks); + assertTasksSuccessful(tasks, result, clusterState, false); + } + + public void testDuplicateFailuresAreOkay() throws Exception { + String reason = "test duplicate failures are okay"; + ClusterState currentState = createClusterStateWithStartedShards(reason); + List tasks = createExistingShards(currentState, reason); + ClusterStateTaskExecutor.BatchResult result = executor.execute(currentState, tasks); + assertTasksSuccessful(tasks, result, clusterState, true); + } + + public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception { + String reason = "test non existent shards are marked as successful"; + ClusterState currentState = createClusterStateWithStartedShards(reason); + List tasks = createNonExistentShards(currentState, reason); + ClusterStateTaskExecutor.BatchResult result = executor.execute(clusterState, tasks); + assertTasksSuccessful(tasks, result, clusterState, false); + } + + public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Exception { + String reason = "test trivially successful tasks batched with failing tasks"; + ClusterState currentState = createClusterStateWithStartedShards(reason); + List failingTasks = createExistingShards(currentState, reason); + List nonExistentTasks = createNonExistentShards(currentState, reason); + ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) { + @Override + RoutingAllocation.Result applyFailedShards(ClusterState currentState, List failedShards) { + throw new RuntimeException("simulated applyFailedShards failure"); + } + }; + List tasks = new ArrayList<>(); + tasks.addAll(failingTasks); + tasks.addAll(nonExistentTasks); + ClusterStateTaskExecutor.BatchResult result = failingExecutor.execute(currentState, tasks); + Map taskResultMap = + failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> false)); + taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> true))); + assertTaskResults(taskResultMap, result, currentState, false); + } + + private ClusterState createClusterStateWithStartedShards(String reason) { + int numberOfNodes = 1 + numberOfReplicas; + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); + IntStream.rangeClosed(1, numberOfNodes).mapToObj(node -> newNode("node" + node)).forEach(nodes::put); + ClusterState stateAfterAddingNode = + ClusterState.builder(clusterState).nodes(nodes).build(); + RoutingTable afterReroute = + allocationService.reroute(stateAfterAddingNode, reason).routingTable(); + ClusterState stateAfterReroute = ClusterState.builder(stateAfterAddingNode).routingTable(afterReroute).build(); + RoutingNodes routingNodes = stateAfterReroute.getRoutingNodes(); + RoutingTable afterStart = + allocationService.applyStartedShards(stateAfterReroute, routingNodes.shardsWithState(ShardRoutingState.INITIALIZING)).routingTable(); + return ClusterState.builder(stateAfterReroute).routingTable(afterStart).build(); + } + + private List createExistingShards(ClusterState currentState, String reason) { + List shards = new ArrayList<>(); + GroupShardsIterator shardGroups = + currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true); + for (ShardIterator shardIt : shardGroups) { + for (ShardRouting shard : shardIt.asUnordered()) { + shards.add(shard); + } + } + List failures = randomSubsetOf(randomIntBetween(1, 1 + shards.size() / 4), shards.toArray(new ShardRouting[0])); + String indexUUID = metaData.index(INDEX).getIndexUUID(); + int numberOfTasks = randomIntBetween(failures.size(), 2 * failures.size()); + List shardsToFail = new ArrayList<>(numberOfTasks); + for (int i = 0; i < numberOfTasks; i++) { + shardsToFail.add(randomFrom(failures)); + } + return toTasks(shardsToFail, indexUUID, reason); + } + + private List createNonExistentShards(ClusterState currentState, String reason) { + // add shards from a non-existent index + MetaData nonExistentMetaData = + MetaData.builder() + .put(IndexMetaData.builder("non-existent").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(numberOfReplicas)) + .build(); + RoutingTable routingTable = RoutingTable.builder().addAsNew(nonExistentMetaData.index("non-existent")).build(); + String nonExistentIndexUUID = nonExistentMetaData.index("non-existent").getIndexUUID(); + + List existingShards = createExistingShards(currentState, reason); + List shardsWithMismatchedAllocationIds = new ArrayList<>(); + for (ShardStateAction.ShardRoutingEntry existingShard : existingShards) { + ShardRouting sr = existingShard.getShardRouting(); + ShardRouting nonExistentShardRouting = + TestShardRouting.newShardRouting(sr.index(), sr.id(), sr.currentNodeId(), sr.relocatingNodeId(), sr.restoreSource(), sr.primary(), sr.state(), sr.version()); + shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardRoutingEntry(nonExistentShardRouting, existingShard.indexUUID, existingShard.message, existingShard.failure)); + } + + List tasks = new ArrayList<>(); + tasks.addAll(toTasks(routingTable.allShards(), nonExistentIndexUUID, reason)); + tasks.addAll(shardsWithMismatchedAllocationIds); + return tasks; + } + + private static void assertTasksSuccessful( + List tasks, + ClusterStateTaskExecutor.BatchResult result, + ClusterState clusterState, + boolean clusterStateChanged + ) { + Map taskResultMap = + tasks.stream().collect(Collectors.toMap(Function.identity(), task -> true)); + assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged); + } + + private static void assertTaskResults( + Map taskResultMap, + ClusterStateTaskExecutor.BatchResult result, + ClusterState clusterState, + boolean clusterStateChanged + ) { + // there should be as many task results as tasks + assertEquals(taskResultMap.size(), result.executionResults.size()); + + for (Map.Entry entry : taskResultMap.entrySet()) { + // every task should have a corresponding task result + assertTrue(result.executionResults.containsKey(entry.getKey())); + + // the task results are as expected + assertEquals(entry.getValue(), result.executionResults.get(entry.getKey()).isSuccess()); + } + + // every shard that we requested to be successfully failed is + // gone + List shards = clusterState.getRoutingTable().allShards(); + for (Map.Entry entry : taskResultMap.entrySet()) { + if (entry.getValue()) { + for (ShardRouting shard : shards) { + if (entry.getKey().getShardRouting().allocationId() != null) { + assertThat(shard.allocationId(), not(equalTo(entry.getKey().getShardRouting().allocationId()))); + } + } + } + } + + if (clusterStateChanged) { + assertNotSame(clusterState, result.resultingState); + } else { + assertSame(clusterState, result.resultingState); + } + } + + private static List toTasks(List shards, String indexUUID, String message) { + return shards + .stream() + .map(shard -> new ShardStateAction.ShardRoutingEntry(shard, indexUUID, message, new CorruptIndexException("simulated", indexUUID))) + .collect(Collectors.toList()); + } + +} diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index c59405f2345..30d4e48551f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -27,19 +27,19 @@ import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; +import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.NodeNotConnectedException; -import org.elasticsearch.transport.RemoteTransportException; -import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -48,8 +48,6 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -293,6 +291,41 @@ public class ShardStateActionTests extends ESTestCase { assertTrue(failure.get()); } + public void testShardNotFound() throws InterruptedException { + final String index = "test"; + + clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + + String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + + AtomicBoolean success = new AtomicBoolean(); + CountDownLatch latch = new CountDownLatch(1); + + ShardRouting failedShard = getRandomShardRouting(index); + RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build(); + clusterService.setState(ClusterState.builder(clusterService.state()).routingTable(routingTable)); + shardStateAction.shardFailed(failedShard, indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + success.set(true); + latch.countDown(); + } + + @Override + public void onFailure(Throwable t) { + success.set(false); + latch.countDown(); + assert false; + } + }); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + + latch.await(); + assertTrue(success.get()); + } + private ShardRouting getRandomShardRouting(String index) { IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt();