Better handling of shard failures, closes #845.

This commit is contained in:
kimchy 2011-04-10 21:59:59 +03:00
parent 8eab5ec528
commit 7d8726a5e8
9 changed files with 321 additions and 345 deletions

View File

@ -107,20 +107,16 @@ public class ShardStateAction extends AbstractComponent {
logger.warn("received shard failed for {}, reason [{}]", shardRouting, reason);
clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index());
// if there is no routing table, the index has been deleted while it was being allocated
// which is fine, we should just ignore this
if (indexRoutingTable == null) {
if (logger.isDebugEnabled()) {
logger.debug("Received failed shard {}, reason [{}]", shardRouting, reason);
}
RoutingAllocation.Result routingResult = shardsAllocation.applyFailedShard(currentState, shardRouting);
if (!routingResult.changed()) {
return currentState;
}
if (logger.isDebugEnabled()) {
logger.debug("Applying failed shard {}, reason [{}]", shardRouting, reason);
}
RoutingAllocation.Result routingResult = shardsAllocation.applyFailedShards(currentState, newArrayList(shardRouting));
if (!routingResult.changed()) {
return currentState;
}
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
}
});

View File

@ -23,21 +23,19 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import java.util.List;
/**
* @author kimchy (shay.banon)
*/
public class FailedRerouteAllocation extends RoutingAllocation {
private final List<? extends ShardRouting> failedShards;
private final ShardRouting failedShard;
public FailedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> failedShards) {
public FailedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, ShardRouting failedShard) {
super(routingNodes, nodes);
this.failedShards = failedShards;
this.failedShard = failedShard;
}
public List<? extends ShardRouting> failedShards() {
return failedShards;
public ShardRouting failedShard() {
return failedShard;
}
}

View File

@ -84,6 +84,11 @@ public class NodeAllocations extends NodeAllocation {
@Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Decision ret = Decision.YES;
// first, check if its in the ignored, if so, return NO
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
return Decision.NO;
}
// now, go over the registered allocations
for (NodeAllocation allocation1 : allocations) {
Decision decision = allocation1.canAllocate(shardRouting, node, allocation);
if (decision == Decision.NO) {

View File

@ -22,6 +22,10 @@ package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.index.shard.ShardId;
import java.util.HashMap;
import java.util.Map;
/**
* @author kimchy (shay.banon)
@ -61,6 +65,8 @@ public class RoutingAllocation {
private final AllocationExplanation explanation = new AllocationExplanation();
private Map<ShardId, String> ignoredShardToNodes = null;
public RoutingAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes) {
this.routingNodes = routingNodes;
this.nodes = nodes;
@ -81,4 +87,15 @@ public class RoutingAllocation {
public AllocationExplanation explanation() {
return explanation;
}
public void addIgnoreShardForNode(ShardId shardId, String nodeId) {
if (ignoredShardToNodes == null) {
ignoredShardToNodes = new HashMap<ShardId, String>();
}
ignoredShardToNodes.put(shardId, nodeId);
}
public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) {
return ignoredShardToNodes != null && nodeId.equals(ignoredShardToNodes.get(shardId));
}
}

View File

@ -76,16 +76,15 @@ public class ShardsAllocation extends AbstractComponent {
*
* <p>If the same instance of the routing table is returned, then no change has been made.
*/
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<? extends ShardRouting> failedShards) {
public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) {
RoutingNodes routingNodes = clusterState.routingNodes();
FailedRerouteAllocation allocation = new FailedRerouteAllocation(routingNodes, clusterState.nodes(), failedShards);
nodeAllocations.applyFailedShards(nodeAllocations, allocation);
boolean changed = applyFailedShards(allocation);
FailedRerouteAllocation allocation = new FailedRerouteAllocation(routingNodes, clusterState.nodes(), failedShard);
boolean changed = applyFailedShard(allocation);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation());
}
// If we reroute again, the failed shard will try and be assigned to the same node, which we do no do in the applyFailedShards
// reroute(routingNodes, clusterState.nodes());
nodeAllocations.applyFailedShards(nodeAllocations, allocation);
reroute(allocation);
return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation());
}
@ -384,83 +383,75 @@ public class ShardsAllocation extends AbstractComponent {
return dirty;
}
private boolean applyFailedShards(FailedRerouteAllocation allocation) {
boolean dirty = false;
// apply shards might be called several times with the same shard, ignore it
for (ShardRouting failedShard : allocation.failedShards()) {
/**
* Applies the relevant logic to handle a failed shard. Returns <tt>true</tt> if changes happened that
* require relocation.
*/
private boolean applyFailedShard(FailedRerouteAllocation allocation) {
IndexRoutingTable indexRoutingTable = allocation.routingTable().index(allocation.failedShard().index());
if (indexRoutingTable == null) {
return false;
}
boolean shardDirty = false;
boolean inRelocation = failedShard.relocatingNodeId() != null;
if (inRelocation) {
RoutingNode routingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId());
if (routingNode != null) {
Iterator<MutableShardRouting> shards = routingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shardDirty = true;
shard.deassignNode();
shards.remove();
break;
}
}
}
}
ShardRouting failedShard = allocation.failedShard();
String nodeId = inRelocation ? failedShard.relocatingNodeId() : failedShard.currentNodeId();
RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(nodeId);
if (currentRoutingNode == null) {
// already failed (might be called several times for the same shard)
continue;
}
Iterator<MutableShardRouting> shards = currentRoutingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shardDirty = true;
if (!inRelocation) {
boolean shardDirty = false;
boolean inRelocation = failedShard.relocatingNodeId() != null;
if (inRelocation) {
RoutingNode routingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId());
if (routingNode != null) {
Iterator<MutableShardRouting> shards = routingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shardDirty = true;
shard.deassignNode();
shards.remove();
} else {
shard.cancelRelocation();
break;
}
break;
}
}
if (!shardDirty) {
continue;
} else {
dirty = true;
}
// if in relocation no need to find a new target, just cancel the relocation.
if (inRelocation) {
continue;
}
// not in relocation so find a new target.
boolean allocated = false;
List<RoutingNode> sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh();
for (RoutingNode target : sortedNodesLeastToHigh) {
if (!target.nodeId().equals(failedShard.currentNodeId()) &&
nodeAllocations.canAllocate(failedShard, target, allocation).allocate()) {
target.add(new MutableShardRouting(failedShard.index(), failedShard.id(),
target.nodeId(), failedShard.relocatingNodeId(),
failedShard.primary(), INITIALIZING));
allocated = true;
break;
}
}
if (!allocated) {
// we did not manage to allocate it, put it in the unassigned
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
null, failedShard.primary(), ShardRoutingState.UNASSIGNED));
}
}
return dirty;
String nodeId = inRelocation ? failedShard.relocatingNodeId() : failedShard.currentNodeId();
RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(nodeId);
if (currentRoutingNode == null) {
// already failed (might be called several times for the same shard)
return false;
}
Iterator<MutableShardRouting> shards = currentRoutingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shardDirty = true;
if (!inRelocation) {
shard.deassignNode();
shards.remove();
} else {
shard.cancelRelocation();
}
break;
}
}
if (!shardDirty) {
return false;
}
// make sure we ignore this shard on the relevant node
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
// if in relocation no need to find a new target, just cancel the relocation.
if (inRelocation) {
return true; // lets true, so we reroute in this case
}
// add the failed shard to the unassigned shards
allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
null, failedShard.primary(), ShardRoutingState.UNASSIGNED));
return true;
}
}

View File

@ -79,10 +79,8 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation {
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
for (ShardRouting shardRouting : allocation.failedShards()) {
cachedCommitPoints.remove(shardRouting.shardId());
cachedStores.remove(shardRouting.shardId());
}
cachedCommitPoints.remove(allocation.failedShard().shardId());
cachedStores.remove(allocation.failedShard().shardId());
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.*;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -45,8 +44,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
/**
* @author kimchy (shay.banon)
*/
@ -79,82 +76,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
}
@Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) {
for (ShardRouting shardRouting : allocation.failedShards()) {
cachedStores.remove(shardRouting.shardId());
}
TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards nodesState = null;
for (ShardRouting failedShard : allocation.failedShards()) {
// this is an API allocation, ignore since we know there is no data...
if (!allocation.routingNodes().routingTable().index(failedShard.index()).shard(failedShard.id()).allocatedPostApi()) {
continue;
}
// we are still in the initial allocation, find another node with existing shards
// all primary are unassigned for the index, see if we can allocate it on existing nodes, if not, don't assign
if (nodesState == null) {
Set<String> nodesIds = Sets.newHashSet();
nodesIds.addAll(allocation.nodes().dataNodes().keySet());
nodesState = listGatewayStartedShards.list(nodesIds, null).actionGet();
if (nodesState.failures().length > 0) {
StringBuilder sb = new StringBuilder("failures when trying to list started shards on nodes:");
for (int i = 0; i < nodesState.failures().length; i++) {
Throwable cause = ExceptionsHelper.unwrapCause(nodesState.failures()[i]);
if (cause instanceof ConnectTransportException) {
continue;
}
sb.append("\n -> ").append(nodesState.failures()[i].getDetailedMessage());
}
logger.warn(sb.toString());
}
}
// make a list of ShardId to Node, each one from the latest version
Tuple<DiscoveryNode, Long> t = null;
for (TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards nodeState : nodesState) {
if (nodeState.state() == null) {
continue;
}
// we don't want to reallocate to the node we failed on
if (nodeState.node().id().equals(failedShard.currentNodeId())) {
continue;
}
// go and find
for (Map.Entry<ShardId, Long> entry : nodeState.state().shards().entrySet()) {
if (entry.getKey().equals(failedShard.shardId())) {
if (t == null || entry.getValue() > t.v2().longValue()) {
t = new Tuple<DiscoveryNode, Long>(nodeState.node(), entry.getValue());
}
}
}
}
if (t != null) {
// we found a node to allocate to, do it
RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId());
if (currentRoutingNode == null) {
// already failed (might be called several times for the same shard)
continue;
}
// find the shard and cancel relocation
Iterator<MutableShardRouting> shards = currentRoutingNode.iterator();
while (shards.hasNext()) {
MutableShardRouting shard = shards.next();
if (shard.shardId().equals(failedShard.shardId())) {
shard.deassignNode();
shards.remove();
break;
}
}
RoutingNode targetNode = allocation.routingNodes().nodesToShards().get(t.v1().id());
targetNode.add(new MutableShardRouting(failedShard.index(), failedShard.id(),
targetNode.nodeId(), failedShard.relocatingNodeId(),
failedShard.primary(), INITIALIZING));
}
}
ShardRouting failedShard = allocation.failedShard();
cachedStores.remove(failedShard.shardId());
}
@Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) {
@ -162,6 +85,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
DiscoveryNodes nodes = allocation.nodes();
RoutingNodes routingNodes = allocation.routingNodes();
// First, handle primaries, they must find a place to be allocated on here
TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards nodesState = null;
Iterator<MutableShardRouting> unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
@ -201,6 +125,10 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
if (nodeState.state() == null) {
continue;
}
// since we don't check in NO allocation, we need to double check here
if (allocation.shouldIgnoreShardForNode(shard.shardId(), nodeState.node().id())) {
continue;
}
Long version = nodeState.state().shards().get(shard.shardId());
if (version != null) {
numberOfAllocationsFound++;
@ -269,6 +197,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation {
return changed;
}
// Now, handle replicas, try to assign them to nodes that are similar to the one the primary was allocated on
unassignedIterator = routingNodes.unassigned().iterator();
while (unassignedIterator.hasNext()) {
MutableShardRouting shard = unassignedIterator.next();

View File

@ -21,15 +21,14 @@ package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.ImmutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.testng.annotations.Test;
import java.util.List;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.cluster.metadata.MetaData.*;
@ -49,7 +48,7 @@ public class FailedShardsRoutingTests {
private final ESLogger logger = Loggers.getLogger(FailedShardsRoutingTests.class);
@Test public void testFailures() {
@Test public void failPrimaryStartedCheckReplicaElected() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
@ -58,7 +57,7 @@ public class FailedShardsRoutingTests {
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(3).numberOfReplicas(1))
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
@ -80,7 +79,7 @@ public class FailedShardsRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
@ -98,7 +97,199 @@ public class FailedShardsRoutingTests {
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
}
logger.info("fail the primary shard, will have no place to be rerouted to (single node), so stays unassigned");
ShardRouting shardToFail = new ImmutableShardRouting(routingTable.index("test").shard(0).primaryShard());
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShard(clusterState, shardToFail).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shard(0).size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), not(equalTo(shardToFail.currentNodeId())));
assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(UNASSIGNED));
logger.info("fail the shard again, check that nothing happens");
assertThat(strategy.applyFailedShard(clusterState, shardToFail).changed(), equalTo(false));
}
@Test public void firstAllocationFailureSingleNode() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("Adding single node and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
}
logger.info("fail the first shard, will have no place to be rerouted to (single node), so stays unassigned");
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
}
logger.info("fail the shard again, see that nothing happens");
assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).changed(), equalTo(false));
}
@Test public void firstAllocationFailureTwoNodes() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("Adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
}
logger.info("fail the first shard, will start INITIALIZING on the second node");
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(1));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node2"));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
}
logger.info("fail the shard again, see that nothing happens");
assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).changed(), equalTo(false));
}
@Test public void rebalanceFailure() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(2).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
logger.info("Adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
logger.info("Start the shards (primaries)");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(2));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
}
logger.info("Start the shards (backups)");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(2));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
@ -117,179 +308,30 @@ public class FailedShardsRoutingTests {
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(3));
assertThat(routingTable.index("test").shards().size(), equalTo(2));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), lessThan(3));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(3));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), lessThan(3));
assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(2));
assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1));
logger.info("Fail the shards on node 3");
ShardRouting shardToFail = routingNodes.node("node3").shards().get(0);
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)).routingTable();
routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting(shardToFail)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(3));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(3));
assertThat(routingNodes.node("node3"), nullValue());
logger.info("Do another reroute, should try and assign again to node 3");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(3));
assertThat(routingTable.index("test").shards().size(), equalTo(2));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), lessThan(3));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(3));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), lessThan(3));
assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(2));
logger.info("Start the shards on node 3");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(3));
assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(2));
assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(2));
assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(2));
}
@Test public void test10ShardsWith1ReplicaFailure() {
ShardsAllocation strategy = new ShardsAllocation(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 10)
.build());
logger.info("Building initial routing table");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(10).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.index("test").shards().size(), equalTo(10));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue());
}
logger.info("Adding one node and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
logger.info("Add another node and perform rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
// nothing will change, since primary shards have not started yet
assertThat(prevRoutingTable == routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
logger.info("Start the primary shards");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2")));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1")));
}
logger.info("Reroute, nothing should change");
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
assertThat(prevRoutingTable == routingTable, equalTo(true));
logger.info("Fail backup shards on node2");
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
List<MutableShardRouting> failedShards = routingNodes.node("node2").shardsWithState(INITIALIZING);
routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
// fail them again...
routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertThat(prevRoutingTable != routingTable, equalTo(true));
assertThat(routingTable.index("test").shards().size(), equalTo(10));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1"));
assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1));
// backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue());
}
assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1));
// make sure the failedShard is not INITIALIZING again on node3
assertThat(routingNodes.node("node3").shards().get(0).shardId(), not(equalTo(shardToFail.shardId())));
}
}

View File

@ -193,7 +193,7 @@ public class SingleShardNoReplicasRoutingTests {
logger.info("Marking the shard as failed");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
routingTable = strategy.applyFailedShard(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING).get(0)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
assertThat(prevRoutingTable != routingTable, equalTo(true));