Shard failure requests for non-existent shards

This commit adds handling on the master side for shard failure requests
for shards that do not exist at the time that they are processed on the
master node (whether it be from errant requests, duplicate requests, or
both the primary and replica notifying the master of a shard
failure). This change is made because such shard failure requests should
always be considered successful (the failed shard is not there anymore),
but could be marked as failed if batched with a shard failure request
that does in fact fail. This avoids the possibility of an unexpected
catastrophic failure while applying the failed shards from causing such
a request to also be marked as failed setting in motion additional
failures.

Closes #16089
This commit is contained in:
Jason Tedor 2016-01-14 18:40:11 -05:00
parent 2b2552c301
commit 392814ea6f
6 changed files with 325 additions and 40 deletions

View File

@ -120,7 +120,7 @@ public interface ClusterStateTaskExecutor<T> {
}
public boolean isSuccess() {
return failure != null;
return this == SUCCESS;
}
/**

View File

@ -1,23 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.shard;
public class NoOpShardStateActionListener implements ShardStateAction.Listener {
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -61,6 +62,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry;
@ -209,12 +212,12 @@ public class ShardStateAction extends AbstractComponent {
}
}
private static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardRoutingEntry> {
static class ShardFailedClusterStateTaskExecutor implements ClusterStateTaskExecutor<ShardRoutingEntry> {
private final AllocationService allocationService;
private final RoutingService routingService;
private final ESLogger logger;
public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) {
ShardFailedClusterStateTaskExecutor(AllocationService allocationService, RoutingService routingService, ESLogger logger) {
this.allocationService = allocationService;
this.routingService = routingService;
this.logger = logger;
@ -223,23 +226,56 @@ public class ShardStateAction extends AbstractComponent {
@Override
public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception {
BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder();
List<FailedRerouteAllocation.FailedShard> failedShards = new ArrayList<>(tasks.size());
for (ShardRoutingEntry task : tasks) {
failedShards.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure));
}
// partition tasks into those that correspond to shards
// that exist versus do not exist
Map<Boolean, List<ShardRoutingEntry>> partition =
tasks.stream().collect(Collectors.partitioningBy(task -> shardExists(currentState, task)));
// tasks that correspond to non-existent shards are marked
// as successful
batchResultBuilder.successes(partition.get(false));
ClusterState maybeUpdatedState = currentState;
List<ShardRoutingEntry> tasksToFail = partition.get(true);
try {
RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, failedShards);
List<FailedRerouteAllocation.FailedShard> failedShards =
tasksToFail
.stream()
.map(task -> new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure))
.collect(Collectors.toList());
RoutingAllocation.Result result = applyFailedShards(currentState, failedShards);
if (result.changed()) {
maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build();
}
batchResultBuilder.successes(tasks);
batchResultBuilder.successes(tasksToFail);
} catch (Throwable t) {
batchResultBuilder.failures(tasks, t);
// failures are communicated back to the requester
// cluster state will not be updated in this case
batchResultBuilder.failures(tasksToFail, t);
}
return batchResultBuilder.build(maybeUpdatedState);
}
// visible for testing
RoutingAllocation.Result applyFailedShards(ClusterState currentState, List<FailedRerouteAllocation.FailedShard> failedShards) {
return allocationService.applyFailedShards(currentState, failedShards);
}
private boolean shardExists(ClusterState currentState, ShardRoutingEntry task) {
RoutingNodes.RoutingNodeIterator routingNodeIterator =
currentState.getRoutingNodes().routingNodeIter(task.getShardRouting().currentNodeId());
if (routingNodeIterator != null) {
for (ShardRouting maybe : routingNodeIterator) {
if (task.getShardRouting().isSameAllocation(maybe)) {
return true;
}
}
}
return false;
}
@Override
public void clusterStatePublished(ClusterState newClusterState) {
int numberOfUnassignedShards = newClusterState.getRoutingNodes().unassigned().size();

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.action.index.NodeIndexDeletedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.NoOpShardStateActionListener;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
@ -92,7 +91,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final NodeMappingRefreshAction nodeMappingRefreshAction;
private final NodeServicesProvider nodeServicesProvider;
private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new NoOpShardStateActionListener();
private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() {};
// a map of mappings type we have seen per index due to cluster state
// we need this so we won't remove types automatically created as part of the indexing process
@ -754,7 +753,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
} catch (Throwable e) {
logger.warn("failed to clean index ({})", e, reason);
}
}
private void deleteIndex(String index, String reason) {

View File

@ -0,0 +1,241 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.action.shard;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.test.ESAllocationTestCase;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
public class ShardFailedClusterStateTaskExecutorTests extends ESAllocationTestCase {
private static final String INDEX = "INDEX";
private AllocationService allocationService;
private int numberOfReplicas;
private MetaData metaData;
private RoutingTable routingTable;
private ClusterState clusterState;
private ShardStateAction.ShardFailedClusterStateTaskExecutor executor;
@Before
public void setUp() throws Exception {
super.setUp();
allocationService = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 8)
.put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always")
.build());
numberOfReplicas = randomIntBetween(2, 16);
metaData = MetaData.builder()
.put(IndexMetaData.builder(INDEX).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(numberOfReplicas))
.build();
routingTable = RoutingTable.builder()
.addAsNew(metaData.index(INDEX))
.build();
clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
executor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger);
}
public void testEmptyTaskListProducesSameClusterState() throws Exception {
List<ShardStateAction.ShardRoutingEntry> tasks = Collections.emptyList();
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result =
executor.execute(clusterState, tasks);
assertTasksSuccessful(tasks, result, clusterState, false);
}
public void testDuplicateFailuresAreOkay() throws Exception {
String reason = "test duplicate failures are okay";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardRoutingEntry> tasks = createExistingShards(currentState, reason);
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result = executor.execute(currentState, tasks);
assertTasksSuccessful(tasks, result, clusterState, true);
}
public void testNonExistentShardsAreMarkedAsSuccessful() throws Exception {
String reason = "test non existent shards are marked as successful";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardRoutingEntry> tasks = createNonExistentShards(currentState, reason);
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result = executor.execute(clusterState, tasks);
assertTasksSuccessful(tasks, result, clusterState, false);
}
public void testTriviallySuccessfulTasksBatchedWithFailingTasks() throws Exception {
String reason = "test trivially successful tasks batched with failing tasks";
ClusterState currentState = createClusterStateWithStartedShards(reason);
List<ShardStateAction.ShardRoutingEntry> failingTasks = createExistingShards(currentState, reason);
List<ShardStateAction.ShardRoutingEntry> nonExistentTasks = createNonExistentShards(currentState, reason);
ShardStateAction.ShardFailedClusterStateTaskExecutor failingExecutor = new ShardStateAction.ShardFailedClusterStateTaskExecutor(allocationService, null, logger) {
@Override
RoutingAllocation.Result applyFailedShards(ClusterState currentState, List<FailedRerouteAllocation.FailedShard> failedShards) {
throw new RuntimeException("simulated applyFailedShards failure");
}
};
List<ShardStateAction.ShardRoutingEntry> tasks = new ArrayList<>();
tasks.addAll(failingTasks);
tasks.addAll(nonExistentTasks);
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result = failingExecutor.execute(currentState, tasks);
Map<ShardStateAction.ShardRoutingEntry, Boolean> taskResultMap =
failingTasks.stream().collect(Collectors.toMap(Function.identity(), task -> false));
taskResultMap.putAll(nonExistentTasks.stream().collect(Collectors.toMap(Function.identity(), task -> true)));
assertTaskResults(taskResultMap, result, currentState, false);
}
private ClusterState createClusterStateWithStartedShards(String reason) {
int numberOfNodes = 1 + numberOfReplicas;
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
IntStream.rangeClosed(1, numberOfNodes).mapToObj(node -> newNode("node" + node)).forEach(nodes::put);
ClusterState stateAfterAddingNode =
ClusterState.builder(clusterState).nodes(nodes).build();
RoutingTable afterReroute =
allocationService.reroute(stateAfterAddingNode, reason).routingTable();
ClusterState stateAfterReroute = ClusterState.builder(stateAfterAddingNode).routingTable(afterReroute).build();
RoutingNodes routingNodes = stateAfterReroute.getRoutingNodes();
RoutingTable afterStart =
allocationService.applyStartedShards(stateAfterReroute, routingNodes.shardsWithState(ShardRoutingState.INITIALIZING)).routingTable();
return ClusterState.builder(stateAfterReroute).routingTable(afterStart).build();
}
private List<ShardStateAction.ShardRoutingEntry> createExistingShards(ClusterState currentState, String reason) {
List<ShardRouting> shards = new ArrayList<>();
GroupShardsIterator shardGroups =
currentState.routingTable().allAssignedShardsGrouped(new String[] { INDEX }, true);
for (ShardIterator shardIt : shardGroups) {
for (ShardRouting shard : shardIt.asUnordered()) {
shards.add(shard);
}
}
List<ShardRouting> failures = randomSubsetOf(randomIntBetween(1, 1 + shards.size() / 4), shards.toArray(new ShardRouting[0]));
String indexUUID = metaData.index(INDEX).getIndexUUID();
int numberOfTasks = randomIntBetween(failures.size(), 2 * failures.size());
List<ShardRouting> shardsToFail = new ArrayList<>(numberOfTasks);
for (int i = 0; i < numberOfTasks; i++) {
shardsToFail.add(randomFrom(failures));
}
return toTasks(shardsToFail, indexUUID, reason);
}
private List<ShardStateAction.ShardRoutingEntry> createNonExistentShards(ClusterState currentState, String reason) {
// add shards from a non-existent index
MetaData nonExistentMetaData =
MetaData.builder()
.put(IndexMetaData.builder("non-existent").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(numberOfReplicas))
.build();
RoutingTable routingTable = RoutingTable.builder().addAsNew(nonExistentMetaData.index("non-existent")).build();
String nonExistentIndexUUID = nonExistentMetaData.index("non-existent").getIndexUUID();
List<ShardStateAction.ShardRoutingEntry> existingShards = createExistingShards(currentState, reason);
List<ShardStateAction.ShardRoutingEntry> shardsWithMismatchedAllocationIds = new ArrayList<>();
for (ShardStateAction.ShardRoutingEntry existingShard : existingShards) {
ShardRouting sr = existingShard.getShardRouting();
ShardRouting nonExistentShardRouting =
TestShardRouting.newShardRouting(sr.index(), sr.id(), sr.currentNodeId(), sr.relocatingNodeId(), sr.restoreSource(), sr.primary(), sr.state(), sr.version());
shardsWithMismatchedAllocationIds.add(new ShardStateAction.ShardRoutingEntry(nonExistentShardRouting, existingShard.indexUUID, existingShard.message, existingShard.failure));
}
List<ShardStateAction.ShardRoutingEntry> tasks = new ArrayList<>();
tasks.addAll(toTasks(routingTable.allShards(), nonExistentIndexUUID, reason));
tasks.addAll(shardsWithMismatchedAllocationIds);
return tasks;
}
private static void assertTasksSuccessful(
List<ShardStateAction.ShardRoutingEntry> tasks,
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result,
ClusterState clusterState,
boolean clusterStateChanged
) {
Map<ShardStateAction.ShardRoutingEntry, Boolean> taskResultMap =
tasks.stream().collect(Collectors.toMap(Function.identity(), task -> true));
assertTaskResults(taskResultMap, result, clusterState, clusterStateChanged);
}
private static void assertTaskResults(
Map<ShardStateAction.ShardRoutingEntry, Boolean> taskResultMap,
ClusterStateTaskExecutor.BatchResult<ShardStateAction.ShardRoutingEntry> result,
ClusterState clusterState,
boolean clusterStateChanged
) {
// there should be as many task results as tasks
assertEquals(taskResultMap.size(), result.executionResults.size());
for (Map.Entry<ShardStateAction.ShardRoutingEntry, Boolean> entry : taskResultMap.entrySet()) {
// every task should have a corresponding task result
assertTrue(result.executionResults.containsKey(entry.getKey()));
// the task results are as expected
assertEquals(entry.getValue(), result.executionResults.get(entry.getKey()).isSuccess());
}
// every shard that we requested to be successfully failed is
// gone
List<ShardRouting> shards = clusterState.getRoutingTable().allShards();
for (Map.Entry<ShardStateAction.ShardRoutingEntry, Boolean> entry : taskResultMap.entrySet()) {
if (entry.getValue()) {
for (ShardRouting shard : shards) {
if (entry.getKey().getShardRouting().allocationId() != null) {
assertThat(shard.allocationId(), not(equalTo(entry.getKey().getShardRouting().allocationId())));
}
}
}
}
if (clusterStateChanged) {
assertNotSame(clusterState, result.resultingState);
} else {
assertSame(clusterState, result.resultingState);
}
}
private static List<ShardStateAction.ShardRoutingEntry> toTasks(List<ShardRouting> shards, String indexUUID, String message) {
return shards
.stream()
.map(shard -> new ShardStateAction.ShardRoutingEntry(shard, indexUUID, message, new CorruptIndexException("simulated", indexUUID)))
.collect(Collectors.toList());
}
}

View File

@ -27,19 +27,19 @@ import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NodeDisconnectedException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.SendRequestTransportException;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
@ -48,8 +48,6 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -293,6 +291,41 @@ public class ShardStateActionTests extends ESTestCase {
assertTrue(failure.get());
}
public void testShardNotFound() throws InterruptedException {
final String index = "test";
clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5)));
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
AtomicBoolean success = new AtomicBoolean();
CountDownLatch latch = new CountDownLatch(1);
ShardRouting failedShard = getRandomShardRouting(index);
RoutingTable routingTable = RoutingTable.builder(clusterService.state().getRoutingTable()).remove(index).build();
clusterService.setState(ClusterState.builder(clusterService.state()).routingTable(routingTable));
shardStateAction.shardFailed(failedShard, indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override
public void onSuccess() {
success.set(true);
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
success.set(false);
latch.countDown();
assert false;
}
});
CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
latch.await();
assertTrue(success.get());
}
private ShardRouting getRandomShardRouting(String index) {
IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index);
ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt();