Failed shards could be re-assigned to the same nodes if multiple replicas failed at once

After a shard fails on a node we assign a new replica on another node. This is important in order to avoid failing again due to node specific problems. In the rare case where two different replicas of the same shard failed in a short time span, we may fail to do so and assign one of them back to the node it's currently on. This happens if both shard failed events are processed within the same batch on the master.

Closes #5725
This commit is contained in:
Boaz Leskes 2014-04-08 10:28:31 +02:00
parent c58b823e9f
commit e8467f0978
3 changed files with 80 additions and 17 deletions

View File

@ -29,7 +29,9 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* The {@link RoutingAllocation} keep the state of the current allocation * The {@link RoutingAllocation} keep the state of the current allocation
@ -107,7 +109,7 @@ public class RoutingAllocation {
private final ClusterInfo clusterInfo; private final ClusterInfo clusterInfo;
private Map<ShardId, String> ignoredShardToNodes = null; private Map<ShardId, Set<String>> ignoredShardToNodes = null;
private boolean ignoreDisable = false; private boolean ignoreDisable = false;
@ -199,11 +201,20 @@ public class RoutingAllocation {
if (ignoredShardToNodes == null) { if (ignoredShardToNodes == null) {
ignoredShardToNodes = new HashMap<>(); ignoredShardToNodes = new HashMap<>();
} }
ignoredShardToNodes.put(shardId, nodeId); Set<String> nodes = ignoredShardToNodes.get(shardId);
if (nodes == null) {
nodes = new HashSet<>();
ignoredShardToNodes.put(shardId, nodes);
}
nodes.add(nodeId);
} }
public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) { public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) {
return ignoredShardToNodes != null && nodeId.equals(ignoredShardToNodes.get(shardId)); if (ignoredShardToNodes == null) {
return false;
}
Set<String> nodes = ignoredShardToNodes.get(shardId);
return nodes != null && nodes.contains(nodeId);
} }
/** /**

View File

@ -60,11 +60,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.*;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryStatus;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.recovery.StartRecoveryRequest;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.HashMap; import java.util.HashMap;

View File

@ -23,10 +23,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ImmutableShardRouting; import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
@ -34,6 +31,8 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ElasticsearchAllocationTestCase; import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
@ -63,8 +62,8 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
logger.info("--> adding 2 nodes on same rack and do rerouting"); logger.info("--> adding 2 nodes on same rack and do rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("node1")) .put(newNode("node1"))
.put(newNode("node2")) .put(newNode("node2"))
).build(); ).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState); RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
@ -85,7 +84,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
logger.info("--> adding additional node"); logger.info("--> adding additional node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node3")) .put(newNode("node3"))
).build(); ).build();
rerouteResult = allocation.reroute(clusterState); rerouteResult = allocation.reroute(clusterState);
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
@ -101,7 +100,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
logger.info("--> moving primary shard to node3"); logger.info("--> moving primary shard to node3");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands( rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3")) new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
); );
assertThat(rerouteResult.changed(), equalTo(true)); assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
@ -117,7 +116,7 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
logger.info("--> moving primary shard to node3"); logger.info("--> moving primary shard to node3");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands( rerouteResult = allocation.reroute(clusterState, new AllocationCommands(
new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3")) new MoveAllocationCommand(clusterState.routingTable().index("test").shard(0).primaryShard().shardId(), clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(), "node3"))
); );
assertThat(rerouteResult.changed(), equalTo(true)); assertThat(rerouteResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
@ -273,6 +272,63 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING, 0)).changed(), equalTo(false)); assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING, 0)).changed(), equalTo(false));
} }
@Test
public void singleShardMultipleAllocationFailures() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
logger.info("Building initial routing table");
int numberOfReplicas = scaledRandomIntBetween(2, 10);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(numberOfReplicas))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("Adding {} nodes and performing rerouting", numberOfReplicas + 1);
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder();
for (int i = 0; i < numberOfReplicas + 1; i++) {
nodeBuilder.put(newNode("node" + Integer.toString(i)));
}
clusterState = ClusterState.builder(clusterState).nodes(nodeBuilder).build();
while (!clusterState.routingTable().shardsWithState(UNASSIGNED).isEmpty()) {
// start all initializing
clusterState = ClusterState.builder(clusterState)
.routingTable(strategy
.applyStartedShards(clusterState, clusterState.routingTable().shardsWithState(INITIALIZING)).routingTable()
)
.build();
// and assign more unassigned
clusterState = ClusterState.builder(clusterState).routingTable(strategy.reroute(clusterState).routingTable()).build();
}
int shardsToFail = randomIntBetween(1, numberOfReplicas);
ArrayList<ShardRouting> failedShards = new ArrayList<>();
RoutingNodes routingNodes = clusterState.routingNodes();
for (int i = 0; i < shardsToFail; i++) {
String n = "node" + Integer.toString(randomInt(numberOfReplicas));
logger.info("failing shard on node [{}]", n);
ShardRouting shardToFail = routingNodes.node(n).get(0);
failedShards.add(new MutableShardRouting(shardToFail));
}
routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (ShardRouting failedShard : failedShards) {
if (!routingNodes.node(failedShard.currentNodeId()).isEmpty()) {
fail("shard " + failedShard + " was re-assigned to it's node");
}
}
}
@Test @Test
public void firstAllocationFailureTwoNodes() { public void firstAllocationFailureTwoNodes() {
AllocationService strategy = createAllocationService(settingsBuilder() AllocationService strategy = createAllocationService(settingsBuilder()