diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index fe8cb6c61ce..6269fbcf3aa 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -20,7 +20,6 @@ package org.elasticsearch.cluster.routing.allocation; import com.google.common.collect.Lists; - import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.cluster.ClusterState; @@ -103,7 +102,7 @@ public class AllocationService extends AbstractComponent { // shuffle the unassigned nodes, just so we won't have things like poison failed shards Collections.shuffle(routingNodes.unassigned()); FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShard); - boolean changed = applyFailedShard(allocation, failedShard); + boolean changed = applyFailedShard(allocation, failedShard, true); if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); } @@ -165,7 +164,7 @@ public class AllocationService extends AbstractComponent { // elect primaries *before* allocating unassigned, so backups of primaries that failed // will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*) - changed |= electPrimaries(allocation.routingNodes()); + changed |= electPrimariesAndUnassignDanglingReplicas(allocation); if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable(), allocation.explanation()); @@ -185,13 +184,13 @@ public class AllocationService extends AbstractComponent { // elect primaries *before* allocating unassigned, so backups of primaries that failed // will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*) - changed |= electPrimaries(allocation.routingNodes()); + changed |= electPrimariesAndUnassignDanglingReplicas(allocation); // now allocate all the unassigned to available nodes if (allocation.routingNodes().hasUnassigned()) { changed |= shardsAllocators.allocateUnassigned(allocation); // elect primaries again, in case this is needed with unassigned allocation - changed |= electPrimaries(allocation.routingNodes()); + changed |= electPrimariesAndUnassignDanglingReplicas(allocation); } // move shards that no longer can be allocated @@ -242,8 +241,9 @@ public class AllocationService extends AbstractComponent { return changed; } - private boolean electPrimaries(RoutingNodes routingNodes) { + private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) { boolean changed = false; + RoutingNodes routingNodes = allocation.routingNodes(); for (MutableShardRouting shardEntry : routingNodes.unassigned()) { if (shardEntry.primary() && !shardEntry.assignedToNode()) { boolean elected = false; @@ -283,6 +283,29 @@ public class AllocationService extends AbstractComponent { } } } + + // go over and remove dangling replicas that are initializing, but we couldn't elect primary ones... + List shardsToFail = null; + for (MutableShardRouting shardEntry : routingNodes.unassigned()) { + if (shardEntry.primary() && !shardEntry.assignedToNode()) { + for (RoutingNode routingNode : routingNodes.nodesToShards().values()) { + for (MutableShardRouting shardEntry2 : routingNode.shards()) { + if (shardEntry.shardId().equals(shardEntry2.shardId()) && !shardEntry2.active()) { + changed = true; + if (shardsToFail == null) { + shardsToFail = new ArrayList(); + } + shardsToFail.add(shardEntry2); + } + } + } + } + } + if (shardsToFail != null) { + for (ShardRouting shardToFail : shardsToFail) { + applyFailedShard(allocation, shardToFail, false); + } + } return changed; } @@ -310,8 +333,7 @@ public class AllocationService extends AbstractComponent { changed = true; // now, go over all the shards routing on the node, and fail them for (MutableShardRouting shardRouting : new ArrayList(node.shards())) { - // we create a copy of the shard routing, since applyFailedShard assumes its a new copy - applyFailedShard(allocation, shardRouting); + applyFailedShard(allocation, shardRouting, false); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard // since it relies on the fact that the RoutingNode exists in the list of nodes @@ -372,7 +394,7 @@ public class AllocationService extends AbstractComponent { * Applies the relevant logic to handle a failed shard. Returns true if changes happened that * require relocation. */ - private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard) { + private boolean applyFailedShard(RoutingAllocation allocation, ShardRouting failedShard, boolean addToIgnoreList) { // create a copy of the failed shard, since we assume we can change possible refernces to it without // changing the state of failed shard failedShard = new ImmutableShardRouting(failedShard); @@ -397,8 +419,10 @@ public class AllocationService extends AbstractComponent { it.remove(); shardRouting.deassignNode(); - // make sure we ignore this shard on the relevant node - allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); + if (addToIgnoreList) { + // make sure we ignore this shard on the relevant node + allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); + } break; } @@ -433,8 +457,10 @@ public class AllocationService extends AbstractComponent { dirty = true; shardRouting.cancelRelocation(); it.remove(); - // make sure we ignore this shard on the relevant node - allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); + if (addToIgnoreList) { + // make sure we ignore this shard on the relevant node + allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); + } allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(), null, failedShard.primary(), ShardRoutingState.UNASSIGNED, failedShard.version() + 1)); @@ -469,8 +495,10 @@ public class AllocationService extends AbstractComponent { MutableShardRouting shardRouting = it.next(); if (shardRouting.equals(failedShard)) { dirty = true; - // make sure we ignore this shard on the relevant node - allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); + if (addToIgnoreList) { + // make sure we ignore this shard on the relevant node + allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId()); + } it.remove(); diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/PrimaryElectionRoutingTests.java index 4712bdd31de..b7122d77e2a 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/PrimaryElectionRoutingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/PrimaryElectionRoutingTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.testng.annotations.Test; @@ -98,4 +99,51 @@ public class PrimaryElectionRoutingTests { assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node2")); assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node3")); } + + @Test + public void testRemovingInitializingReplicasIfPrimariesFails() { + AllocationService allocation = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build()); + + logger.info("Building initial routing table"); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test").numberOfShards(2).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("Adding two nodes and performing rerouting"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build(); + + logger.info("Start the primary shards"); + RoutingNodes routingNodes = clusterState.routingNodes(); + rerouteResult = allocation.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(2)); + assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(2)); + + // now, fail one node, while the replica is initializing, and it also holds a primary + logger.info("--> fail node with primary"); + String nodeIdToFail = clusterState.routingTable().index("test").shard(0).primaryShard().currentNodeId(); + String nodeIdRemaining = nodeIdToFail.equals("node1") ? "node2" : "node1"; + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder() + .put(newNode(nodeIdRemaining)) + ).build(); + rerouteResult = allocation.reroute(clusterState); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build(); + routingNodes = clusterState.routingNodes(); + + assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(1)); + assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(INITIALIZING).get(0).primary(), equalTo(true)); + + } }