Revert "Improve handling of failed primary replica handling"

This reverts commit 75ed24f6b6.
This commit is contained in:
Simon Willnauer 2014-07-10 21:30:15 +02:00
parent 107534c062
commit bb964e7817
3 changed files with 43 additions and 132 deletions

View File

@ -310,12 +310,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
for (RoutingNode routingNode : this) { for (RoutingNode routingNode : this) {
shards.addAll(routingNode.shardsWithState(state)); shards.addAll(routingNode.shardsWithState(state));
} }
for (ShardRoutingState s : state) {
if (s == ShardRoutingState.UNASSIGNED) {
Iterables.addAll(shards, unassigned());
break;
}
}
return shards; return shards;
} }
@ -325,16 +319,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
for (RoutingNode routingNode : this) { for (RoutingNode routingNode : this) {
shards.addAll(routingNode.shardsWithState(index, state)); shards.addAll(routingNode.shardsWithState(index, state));
} }
for (ShardRoutingState s : state) {
if (s == ShardRoutingState.UNASSIGNED) {
for (MutableShardRouting unassignedShard : unassignedShards) {
if (unassignedShard.index().equals(index)) {
shards.add(unassignedShard);
}
}
break;
}
}
return shards; return shards;
} }

View File

@ -192,7 +192,7 @@ public class AllocationService extends AbstractComponent {
// elect primaries *before* allocating unassigned, so backups of primaries that failed // elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*) // will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation); changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
if (!changed) { if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable()); return new RoutingAllocation.Result(false, clusterState.routingTable());
@ -210,13 +210,13 @@ public class AllocationService extends AbstractComponent {
// elect primaries *before* allocating unassigned, so backups of primaries that failed // elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*) // will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation); changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
// now allocate all the unassigned to available nodes // now allocate all the unassigned to available nodes
if (allocation.routingNodes().hasUnassigned()) { if (allocation.routingNodes().hasUnassigned()) {
changed |= shardsAllocators.allocateUnassigned(allocation); changed |= shardsAllocators.allocateUnassigned(allocation);
// elect primaries again, in case this is needed with unassigned allocation // elect primaries again, in case this is needed with unassigned allocation
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation); changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
} }
// move shards that no longer can be allocated // move shards that no longer can be allocated
@ -269,31 +269,13 @@ public class AllocationService extends AbstractComponent {
return changed; return changed;
} }
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) { private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false; boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes(); RoutingNodes routingNodes = allocation.routingNodes();
if (!routingNodes.hasUnassignedPrimaries()) { if (!routingNodes.hasUnassignedPrimaries()) {
// move out if we don't have unassigned primaries // move out if we don't have unassigned primaries
return changed; return changed;
} }
// go over and remove dangling replicas that are initializing for primary shards
List<ShardRouting> shardsToFail = Lists.newArrayList();
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
for (MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) {
if (!routing.primary() && routing.initializing()) {
shardsToFail.add(routing);
}
}
}
}
for (ShardRouting shardToFail : shardsToFail) {
changed |= applyFailedShard(allocation, shardToFail, false);
}
// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
// routingNodes.hasUnassignedPrimaries() will potentially be false
for (MutableShardRouting shardEntry : routingNodes.unassigned()) { for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) { if (shardEntry.primary()) {
MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry); MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
@ -316,6 +298,28 @@ public class AllocationService extends AbstractComponent {
} }
} }
// go over and remove dangling replicas that are initializing, but we couldn't elect primary ones...
List<ShardRouting> shardsToFail = null;
if (routingNodes.hasUnassignedPrimaries()) {
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
for (MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) {
if (!routing.primary()) {
changed = true;
if (shardsToFail == null) {
shardsToFail = new ArrayList<>();
}
shardsToFail.add(routing);
}
}
}
}
if (shardsToFail != null) {
for (ShardRouting shardToFail : shardsToFail) {
applyFailedShard(allocation, shardToFail, false);
}
}
}
return changed; return changed;
} }
@ -417,6 +421,23 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = allocation.routingNodes(); RoutingNodes routingNodes = allocation.routingNodes();
boolean dirty = false; boolean dirty = false;
if (failedShard.primary()) {
// we have to fail the initializing replicas if the primary fails
// since they might now yet have started the recovery and then they will
// stick in the cluster-state forever since the replica has a retry logic that
// retries infinitely in that case.
List<MutableShardRouting> initializingReplicas = new ArrayList<>();
for (MutableShardRouting shard : routingNodes.assignedShards(failedShard)){
if (!shard.primary() && shard.initializing()) {
initializingReplicas.add(shard);
}
}
// we can't do this in the loop above since we modify the iterator and will get
// concurrent modification exceptions
for (MutableShardRouting shard : initializingReplicas) {
dirty |= applyFailedShard(allocation, shard, addToIgnoreList);
}
}
if (failedShard.relocatingNodeId() != null) { if (failedShard.relocatingNodeId() != null) {
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node) // the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
if (failedShard.state() == INITIALIZING) { if (failedShard.state() == INITIALIZING) {

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.routing.allocation; package org.elasticsearch.cluster.routing.allocation;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
@ -483,97 +482,4 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
// make sure the failedShard is not INITIALIZING again on node3 // make sure the failedShard is not INITIALIZING again on node3
assertThat(routingNodes.node("node3").get(0).shardId(), not(equalTo(shardToFail.shardId()))); assertThat(routingNodes.node("node3").get(0).shardId(), not(equalTo(shardToFail.shardId())));
} }
@Test
public void testFailAllReplicasInitializingOnPrimaryFail() {
AllocationService allocation = createAllocationService(settingsBuilder()
.build());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
// add 4 nodes
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
// start primary shards
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
// fail the primary shard, check replicas get removed as well...
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
RoutingAllocation.Result routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(routingResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(routingResult.routingTable()).build();
// the primary gets allocated on another node, replicas are unassigned
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
// start the primary shard
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
// simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it
routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(routingResult.changed(), equalTo(false));
}
@Test
public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToElect() {
AllocationService allocation = createAllocationService(settingsBuilder()
.build());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
// add 4 nodes
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
// start primary shards
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
// start another replica shard, while keep one initializing
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, ImmutableList.of(clusterState.routingNodes().shardsWithState(INITIALIZING).get(0))).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
// fail the primary shard, check one replica gets elected to primary, others become INITIALIZING (from it)
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
RoutingAllocation.Result routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(routingResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(routingResult.routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
// simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it
routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(routingResult.changed(), equalTo(false));
}
} }