From 7d8726a5e81ba5c3d42182ffdb6b1d413a025b9f Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 10 Apr 2011 21:59:59 +0300 Subject: [PATCH] Better handling of shard failures, closes #845. --- .../action/shard/ShardStateAction.java | 14 +- .../allocation/FailedRerouteAllocation.java | 12 +- .../routing/allocation/NodeAllocations.java | 5 + .../routing/allocation/RoutingAllocation.java | 17 + .../routing/allocation/ShardsAllocation.java | 143 +++---- .../BlobReuseExistingNodeAllocation.java | 6 +- .../local/LocalGatewayNodeAllocation.java | 87 +--- .../allocation/FailedShardsRoutingTests.java | 380 ++++++++++-------- .../SingleShardNoReplicasRoutingTests.java | 2 +- 9 files changed, 321 insertions(+), 345 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index 97c527962cb..a5462ab768e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -107,20 +107,16 @@ public class ShardStateAction extends AbstractComponent { logger.warn("received shard failed for {}, reason [{}]", shardRouting, reason); clusterService.submitStateUpdateTask("shard-failed (" + shardRouting + "), reason [" + reason + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - RoutingTable routingTable = currentState.routingTable(); - IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index()); - // if there is no routing table, the index has been deleted while it was being allocated - // which is fine, we should just ignore this - if (indexRoutingTable == null) { + if (logger.isDebugEnabled()) { + logger.debug("Received failed shard {}, reason [{}]", shardRouting, reason); + } + RoutingAllocation.Result routingResult = shardsAllocation.applyFailedShard(currentState, shardRouting); + if (!routingResult.changed()) { return currentState; } if (logger.isDebugEnabled()) { logger.debug("Applying failed shard {}, reason [{}]", shardRouting, reason); } - RoutingAllocation.Result routingResult = shardsAllocation.applyFailedShards(currentState, newArrayList(shardRouting)); - if (!routingResult.changed()) { - return currentState; - } return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); } }); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java index 2be256645ef..5400468533c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java @@ -23,21 +23,19 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; -import java.util.List; - /** * @author kimchy (shay.banon) */ public class FailedRerouteAllocation extends RoutingAllocation { - private final List failedShards; + private final ShardRouting failedShard; - public FailedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, List failedShards) { + public FailedRerouteAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes, ShardRouting failedShard) { super(routingNodes, nodes); - this.failedShards = failedShards; + this.failedShard = failedShard; } - public List failedShards() { - return failedShards; + public ShardRouting failedShard() { + return failedShard; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java index d45ce1005b5..3de79a6b1cb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocations.java @@ -84,6 +84,11 @@ public class NodeAllocations extends NodeAllocation { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { Decision ret = Decision.YES; + // first, check if its in the ignored, if so, return NO + if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) { + return Decision.NO; + } + // now, go over the registered allocations for (NodeAllocation allocation1 : allocations) { Decision decision = allocation1.canAllocate(shardRouting, node, allocation); if (decision == Decision.NO) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 073bf7e2480..d2489d5d615 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -22,6 +22,10 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.index.shard.ShardId; + +import java.util.HashMap; +import java.util.Map; /** * @author kimchy (shay.banon) @@ -61,6 +65,8 @@ public class RoutingAllocation { private final AllocationExplanation explanation = new AllocationExplanation(); + private Map ignoredShardToNodes = null; + public RoutingAllocation(RoutingNodes routingNodes, DiscoveryNodes nodes) { this.routingNodes = routingNodes; this.nodes = nodes; @@ -81,4 +87,15 @@ public class RoutingAllocation { public AllocationExplanation explanation() { return explanation; } + + public void addIgnoreShardForNode(ShardId shardId, String nodeId) { + if (ignoredShardToNodes == null) { + ignoredShardToNodes = new HashMap(); + } + ignoredShardToNodes.put(shardId, nodeId); + } + + public boolean shouldIgnoreShardForNode(ShardId shardId, String nodeId) { + return ignoredShardToNodes != null && nodeId.equals(ignoredShardToNodes.get(shardId)); + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java index 2c0ef7b64a3..cbfa434b2bb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java @@ -76,16 +76,15 @@ public class ShardsAllocation extends AbstractComponent { * *

If the same instance of the routing table is returned, then no change has been made. */ - public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List failedShards) { + public RoutingAllocation.Result applyFailedShard(ClusterState clusterState, ShardRouting failedShard) { RoutingNodes routingNodes = clusterState.routingNodes(); - FailedRerouteAllocation allocation = new FailedRerouteAllocation(routingNodes, clusterState.nodes(), failedShards); - nodeAllocations.applyFailedShards(nodeAllocations, allocation); - boolean changed = applyFailedShards(allocation); + FailedRerouteAllocation allocation = new FailedRerouteAllocation(routingNodes, clusterState.nodes(), failedShard); + boolean changed = applyFailedShard(allocation); if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); } - // If we reroute again, the failed shard will try and be assigned to the same node, which we do no do in the applyFailedShards -// reroute(routingNodes, clusterState.nodes()); + nodeAllocations.applyFailedShards(nodeAllocations, allocation); + reroute(allocation); return new RoutingAllocation.Result(true, new RoutingTable.Builder().updateNodes(routingNodes).build().validateRaiseException(clusterState.metaData()), allocation.explanation()); } @@ -384,83 +383,75 @@ public class ShardsAllocation extends AbstractComponent { return dirty; } - private boolean applyFailedShards(FailedRerouteAllocation allocation) { - boolean dirty = false; - // apply shards might be called several times with the same shard, ignore it - for (ShardRouting failedShard : allocation.failedShards()) { + /** + * Applies the relevant logic to handle a failed shard. Returns true if changes happened that + * require relocation. + */ + private boolean applyFailedShard(FailedRerouteAllocation allocation) { + IndexRoutingTable indexRoutingTable = allocation.routingTable().index(allocation.failedShard().index()); + if (indexRoutingTable == null) { + return false; + } - boolean shardDirty = false; - boolean inRelocation = failedShard.relocatingNodeId() != null; - if (inRelocation) { - RoutingNode routingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId()); - if (routingNode != null) { - Iterator shards = routingNode.iterator(); - while (shards.hasNext()) { - MutableShardRouting shard = shards.next(); - if (shard.shardId().equals(failedShard.shardId())) { - shardDirty = true; - shard.deassignNode(); - shards.remove(); - break; - } - } - } - } + ShardRouting failedShard = allocation.failedShard(); - String nodeId = inRelocation ? failedShard.relocatingNodeId() : failedShard.currentNodeId(); - RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(nodeId); - - if (currentRoutingNode == null) { - // already failed (might be called several times for the same shard) - continue; - } - - Iterator shards = currentRoutingNode.iterator(); - while (shards.hasNext()) { - MutableShardRouting shard = shards.next(); - if (shard.shardId().equals(failedShard.shardId())) { - shardDirty = true; - if (!inRelocation) { + boolean shardDirty = false; + boolean inRelocation = failedShard.relocatingNodeId() != null; + if (inRelocation) { + RoutingNode routingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId()); + if (routingNode != null) { + Iterator shards = routingNode.iterator(); + while (shards.hasNext()) { + MutableShardRouting shard = shards.next(); + if (shard.shardId().equals(failedShard.shardId())) { + shardDirty = true; shard.deassignNode(); shards.remove(); - } else { - shard.cancelRelocation(); + break; } - break; } } - - if (!shardDirty) { - continue; - } else { - dirty = true; - } - - // if in relocation no need to find a new target, just cancel the relocation. - if (inRelocation) { - continue; - } - - // not in relocation so find a new target. - - boolean allocated = false; - List sortedNodesLeastToHigh = allocation.routingNodes().sortedNodesLeastToHigh(); - for (RoutingNode target : sortedNodesLeastToHigh) { - if (!target.nodeId().equals(failedShard.currentNodeId()) && - nodeAllocations.canAllocate(failedShard, target, allocation).allocate()) { - target.add(new MutableShardRouting(failedShard.index(), failedShard.id(), - target.nodeId(), failedShard.relocatingNodeId(), - failedShard.primary(), INITIALIZING)); - allocated = true; - break; - } - } - if (!allocated) { - // we did not manage to allocate it, put it in the unassigned - allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(), - null, failedShard.primary(), ShardRoutingState.UNASSIGNED)); - } } - return dirty; + + String nodeId = inRelocation ? failedShard.relocatingNodeId() : failedShard.currentNodeId(); + RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(nodeId); + + if (currentRoutingNode == null) { + // already failed (might be called several times for the same shard) + return false; + } + + Iterator shards = currentRoutingNode.iterator(); + while (shards.hasNext()) { + MutableShardRouting shard = shards.next(); + if (shard.shardId().equals(failedShard.shardId())) { + shardDirty = true; + if (!inRelocation) { + shard.deassignNode(); + shards.remove(); + } else { + shard.cancelRelocation(); + } + break; + } + } + + if (!shardDirty) { + return false; + } + + // make sure we ignore this shard on the relevant node + allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); + + // if in relocation no need to find a new target, just cancel the relocation. + if (inRelocation) { + return true; // lets true, so we reroute in this case + } + + // add the failed shard to the unassigned shards + allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(), + null, failedShard.primary(), ShardRoutingState.UNASSIGNED)); + + return true; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java index 07511bdbb92..93c0223285c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/blobstore/BlobReuseExistingNodeAllocation.java @@ -79,10 +79,8 @@ public class BlobReuseExistingNodeAllocation extends NodeAllocation { } @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { - for (ShardRouting shardRouting : allocation.failedShards()) { - cachedCommitPoints.remove(shardRouting.shardId()); - cachedStores.remove(shardRouting.shardId()); - } + cachedCommitPoints.remove(allocation.failedShard().shardId()); + cachedStores.remove(allocation.failedShard().shardId()); } @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index aabdb013e2f..2ec7c987bd3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -29,7 +29,6 @@ import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.*; import org.elasticsearch.common.collect.Sets; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -45,8 +44,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import static org.elasticsearch.cluster.routing.ShardRoutingState.*; - /** * @author kimchy (shay.banon) */ @@ -79,82 +76,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } @Override public void applyFailedShards(NodeAllocations nodeAllocations, FailedRerouteAllocation allocation) { - for (ShardRouting shardRouting : allocation.failedShards()) { - cachedStores.remove(shardRouting.shardId()); - } - - TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards nodesState = null; - - for (ShardRouting failedShard : allocation.failedShards()) { - // this is an API allocation, ignore since we know there is no data... - if (!allocation.routingNodes().routingTable().index(failedShard.index()).shard(failedShard.id()).allocatedPostApi()) { - continue; - } - - // we are still in the initial allocation, find another node with existing shards - // all primary are unassigned for the index, see if we can allocate it on existing nodes, if not, don't assign - if (nodesState == null) { - Set nodesIds = Sets.newHashSet(); - nodesIds.addAll(allocation.nodes().dataNodes().keySet()); - nodesState = listGatewayStartedShards.list(nodesIds, null).actionGet(); - - if (nodesState.failures().length > 0) { - StringBuilder sb = new StringBuilder("failures when trying to list started shards on nodes:"); - for (int i = 0; i < nodesState.failures().length; i++) { - Throwable cause = ExceptionsHelper.unwrapCause(nodesState.failures()[i]); - if (cause instanceof ConnectTransportException) { - continue; - } - sb.append("\n -> ").append(nodesState.failures()[i].getDetailedMessage()); - } - logger.warn(sb.toString()); - } - } - - // make a list of ShardId to Node, each one from the latest version - Tuple t = null; - for (TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards nodeState : nodesState) { - if (nodeState.state() == null) { - continue; - } - // we don't want to reallocate to the node we failed on - if (nodeState.node().id().equals(failedShard.currentNodeId())) { - continue; - } - // go and find - for (Map.Entry entry : nodeState.state().shards().entrySet()) { - if (entry.getKey().equals(failedShard.shardId())) { - if (t == null || entry.getValue() > t.v2().longValue()) { - t = new Tuple(nodeState.node(), entry.getValue()); - } - } - } - } - if (t != null) { - // we found a node to allocate to, do it - RoutingNode currentRoutingNode = allocation.routingNodes().nodesToShards().get(failedShard.currentNodeId()); - if (currentRoutingNode == null) { - // already failed (might be called several times for the same shard) - continue; - } - - // find the shard and cancel relocation - Iterator shards = currentRoutingNode.iterator(); - while (shards.hasNext()) { - MutableShardRouting shard = shards.next(); - if (shard.shardId().equals(failedShard.shardId())) { - shard.deassignNode(); - shards.remove(); - break; - } - } - - RoutingNode targetNode = allocation.routingNodes().nodesToShards().get(t.v1().id()); - targetNode.add(new MutableShardRouting(failedShard.index(), failedShard.id(), - targetNode.nodeId(), failedShard.relocatingNodeId(), - failedShard.primary(), INITIALIZING)); - } - } + ShardRouting failedShard = allocation.failedShard(); + cachedStores.remove(failedShard.shardId()); } @Override public boolean allocateUnassigned(NodeAllocations nodeAllocations, RoutingAllocation allocation) { @@ -162,6 +85,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { DiscoveryNodes nodes = allocation.nodes(); RoutingNodes routingNodes = allocation.routingNodes(); + // First, handle primaries, they must find a place to be allocated on here TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards nodesState = null; Iterator unassignedIterator = routingNodes.unassigned().iterator(); while (unassignedIterator.hasNext()) { @@ -201,6 +125,10 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { if (nodeState.state() == null) { continue; } + // since we don't check in NO allocation, we need to double check here + if (allocation.shouldIgnoreShardForNode(shard.shardId(), nodeState.node().id())) { + continue; + } Long version = nodeState.state().shards().get(shard.shardId()); if (version != null) { numberOfAllocationsFound++; @@ -269,6 +197,7 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { return changed; } + // Now, handle replicas, try to assign them to nodes that are similar to the one the primary was allocated on unassignedIterator = routingNodes.unassigned().iterator(); while (unassignedIterator.hasNext()) { MutableShardRouting shard = unassignedIterator.next(); diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index 2d513275e85..e7cbd154982 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -21,15 +21,14 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.ImmutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.testng.annotations.Test; -import java.util.List; - import static org.elasticsearch.cluster.ClusterState.*; import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.cluster.metadata.MetaData.*; @@ -49,7 +48,7 @@ public class FailedShardsRoutingTests { private final ESLogger logger = Loggers.getLogger(FailedShardsRoutingTests.class); - @Test public void testFailures() { + @Test public void failPrimaryStartedCheckReplicaElected() { ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.allow_rebalance", "always") @@ -58,7 +57,7 @@ public class FailedShardsRoutingTests { logger.info("Building initial routing table"); MetaData metaData = newMetaDataBuilder() - .put(newIndexMetaDataBuilder("test").numberOfShards(3).numberOfReplicas(1)) + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) .build(); RoutingTable routingTable = routingTable() @@ -80,7 +79,7 @@ public class FailedShardsRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); @@ -98,7 +97,199 @@ public class FailedShardsRoutingTests { clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2"))); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1"))); + } + + logger.info("fail the primary shard, will have no place to be rerouted to (single node), so stays unassigned"); + ShardRouting shardToFail = new ImmutableShardRouting(routingTable.index("test").shard(0).primaryShard()); + prevRoutingTable = routingTable; + routingTable = strategy.applyFailedShard(clusterState, shardToFail).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shard(0).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), not(equalTo(shardToFail.currentNodeId()))); + assertThat(routingTable.index("test").shard(0).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2"))); + assertThat(routingTable.index("test").shard(0).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(0).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + + logger.info("fail the shard again, check that nothing happens"); + assertThat(strategy.applyFailedShard(clusterState, shardToFail).changed(), equalTo(false)); + } + + @Test public void firstAllocationFailureSingleNode() { + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("Adding single node and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + logger.info("fail the first shard, will have no place to be rerouted to (single node), so stays unassigned"); + prevRoutingTable = routingTable; + routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + logger.info("fail the shard again, see that nothing happens"); + assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).changed(), equalTo(false)); + } + + @Test public void firstAllocationFailureTwoNodes() { + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("Adding two nodes and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + logger.info("fail the first shard, will start INITIALIZING on the second node"); + prevRoutingTable = routingTable; + routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(1)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node2")); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + logger.info("fail the shard again, see that nothing happens"); + assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).changed(), equalTo(false)); + } + + @Test public void rebalanceFailure() { + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(2).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("Adding two nodes and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + logger.info("Start the shards (primaries)"); + RoutingNodes routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(2)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2"))); + assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1"))); + } + + logger.info("Start the shards (backups)"); + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + assertThat(prevRoutingTable != routingTable, equalTo(true)); + assertThat(routingTable.index("test").shards().size(), equalTo(2)); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); @@ -117,179 +308,30 @@ public class FailedShardsRoutingTests { routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(3)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(3)); + assertThat(routingTable.index("test").shards().size(), equalTo(2)); + assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2)); assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), lessThan(3)); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(3)); + assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2)); assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), lessThan(3)); - assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(2)); + assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1)); + logger.info("Fail the shards on node 3"); + ShardRouting shardToFail = routingNodes.node("node3").shards().get(0); routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)).routingTable(); + routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting(shardToFail)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(3)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(3)); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(3)); - assertThat(routingNodes.node("node3"), nullValue()); - - logger.info("Do another reroute, should try and assign again to node 3"); - prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState).routingTable(); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); - - assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(3)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(3)); + assertThat(routingTable.index("test").shards().size(), equalTo(2)); + assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2)); assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), lessThan(3)); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(3)); + assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED, RELOCATING), equalTo(2)); assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), lessThan(3)); - assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(2)); - - logger.info("Start the shards on node 3"); - routingNodes = clusterState.routingNodes(); - prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node3").shardsWithState(INITIALIZING)).routingTable(); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); - - assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(3)); - assertThat(routingNodes.node("node1").numberOfShardsWithState(STARTED), equalTo(2)); - assertThat(routingNodes.node("node2").numberOfShardsWithState(STARTED), equalTo(2)); - assertThat(routingNodes.node("node3").numberOfShardsWithState(STARTED), equalTo(2)); - } - - @Test public void test10ShardsWith1ReplicaFailure() { - ShardsAllocation strategy = new ShardsAllocation(settingsBuilder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 10) - .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) - .build()); - - logger.info("Building initial routing table"); - - MetaData metaData = newMetaDataBuilder() - .put(newIndexMetaDataBuilder("test").numberOfShards(10).numberOfReplicas(1)) - .build(); - - RoutingTable routingTable = routingTable() - .add(indexRoutingTable("test").initializeEmpty(metaData.index("test"))) - .build(); - - ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); - - assertThat(routingTable.index("test").shards().size(), equalTo(10)); - for (int i = 0; i < routingTable.index("test").shards().size(); i++) { - assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED)); - assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED)); - assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue()); - assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue()); - } - - logger.info("Adding one node and performing rerouting"); - clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1"))).build(); - - RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState).routingTable(); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - - assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(10)); - for (int i = 0; i < routingTable.index("test").shards().size(); i++) { - assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); - assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); - assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); - assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); - assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue()); - } - - logger.info("Add another node and perform rerouting"); - clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().putAll(clusterState.nodes()).put(newNode("node2"))).build(); - prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState).routingTable(); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - - // nothing will change, since primary shards have not started yet - assertThat(prevRoutingTable == routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(10)); - for (int i = 0; i < routingTable.index("test").shards().size(); i++) { - assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); - assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); - assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); - assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); - assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue()); - } - - logger.info("Start the primary shards"); - RoutingNodes routingNodes = clusterState.routingNodes(); - prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - - assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(10)); - for (int i = 0; i < routingTable.index("test").shards().size(); i++) { - assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), anyOf(equalTo("node1"), equalTo("node2"))); - assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); - assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); - assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), anyOf(equalTo("node2"), equalTo("node1"))); - } - - logger.info("Reroute, nothing should change"); - prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState).routingTable(); - assertThat(prevRoutingTable == routingTable, equalTo(true)); - - logger.info("Fail backup shards on node2"); - routingNodes = clusterState.routingNodes(); - prevRoutingTable = routingTable; - List failedShards = routingNodes.node("node2").shardsWithState(INITIALIZING); - routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable(); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); - - assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(10)); - for (int i = 0; i < routingTable.index("test").shards().size(); i++) { - assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); - assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); - assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); - assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue()); - } - - // fail them again... - routingTable = strategy.applyFailedShards(clusterState, failedShards).routingTable(); - clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); - - assertThat(prevRoutingTable != routingTable, equalTo(true)); - assertThat(routingTable.index("test").shards().size(), equalTo(10)); - for (int i = 0; i < routingTable.index("test").shards().size(); i++) { - assertThat(routingTable.index("test").shard(i).size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(2)); - assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); - assertThat(routingTable.index("test").shard(i).primaryShard().currentNodeId(), equalTo("node1")); - assertThat(routingTable.index("test").shard(i).replicaShards().size(), equalTo(1)); - // backup shards are initializing as well, we make sure that they recover from primary *started* shards in the IndicesClusterStateService - assertThat(routingTable.index("test").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); - assertThat(routingTable.index("test").shard(i).replicaShards().get(0).currentNodeId(), nullValue()); - } + assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1)); + // make sure the failedShard is not INITIALIZING again on node3 + assertThat(routingNodes.node("node3").shards().get(0).shardId(), not(equalTo(shardToFail.shardId()))); } } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java index f3565c70305..491af02aa56 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/SingleShardNoReplicasRoutingTests.java @@ -193,7 +193,7 @@ public class SingleShardNoReplicasRoutingTests { logger.info("Marking the shard as failed"); RoutingNodes routingNodes = clusterState.routingNodes(); prevRoutingTable = routingTable; - routingTable = strategy.applyFailedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); + routingTable = strategy.applyFailedShard(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING).get(0)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); assertThat(prevRoutingTable != routingTable, equalTo(true));