Improve handling of failed primary replica handling
Out of #6808, we improved the handling of a primary failing to make sure replicas that are initializing are properly failed as well. After double checking it, it has 2 problems, the first, if the same shard routing is failed again, there is no protection that we don't apply the failure (which we do in failed shard cases), and the other was that we already tried to handle it (wrongly) in the elect primary method. This change fixes the handling to work correctly in the elect primary method, and adds unit tests to verify the behavior closes #6816
This commit is contained in:
parent
4f131dfffb
commit
75ed24f6b6
|
@ -310,6 +310,12 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
for (RoutingNode routingNode : this) {
|
for (RoutingNode routingNode : this) {
|
||||||
shards.addAll(routingNode.shardsWithState(state));
|
shards.addAll(routingNode.shardsWithState(state));
|
||||||
}
|
}
|
||||||
|
for (ShardRoutingState s : state) {
|
||||||
|
if (s == ShardRoutingState.UNASSIGNED) {
|
||||||
|
Iterables.addAll(shards, unassigned());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
return shards;
|
return shards;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,6 +325,16 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
for (RoutingNode routingNode : this) {
|
for (RoutingNode routingNode : this) {
|
||||||
shards.addAll(routingNode.shardsWithState(index, state));
|
shards.addAll(routingNode.shardsWithState(index, state));
|
||||||
}
|
}
|
||||||
|
for (ShardRoutingState s : state) {
|
||||||
|
if (s == ShardRoutingState.UNASSIGNED) {
|
||||||
|
for (MutableShardRouting unassignedShard : unassignedShards) {
|
||||||
|
if (unassignedShard.index().equals(index)) {
|
||||||
|
shards.add(unassignedShard);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
return shards;
|
return shards;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -192,7 +192,7 @@ public class AllocationService extends AbstractComponent {
|
||||||
|
|
||||||
// elect primaries *before* allocating unassigned, so backups of primaries that failed
|
// elect primaries *before* allocating unassigned, so backups of primaries that failed
|
||||||
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
|
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
|
||||||
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
|
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
|
||||||
|
|
||||||
if (!changed) {
|
if (!changed) {
|
||||||
return new RoutingAllocation.Result(false, clusterState.routingTable());
|
return new RoutingAllocation.Result(false, clusterState.routingTable());
|
||||||
|
@ -210,13 +210,13 @@ public class AllocationService extends AbstractComponent {
|
||||||
|
|
||||||
// elect primaries *before* allocating unassigned, so backups of primaries that failed
|
// elect primaries *before* allocating unassigned, so backups of primaries that failed
|
||||||
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
|
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
|
||||||
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
|
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
|
||||||
|
|
||||||
// now allocate all the unassigned to available nodes
|
// now allocate all the unassigned to available nodes
|
||||||
if (allocation.routingNodes().hasUnassigned()) {
|
if (allocation.routingNodes().hasUnassigned()) {
|
||||||
changed |= shardsAllocators.allocateUnassigned(allocation);
|
changed |= shardsAllocators.allocateUnassigned(allocation);
|
||||||
// elect primaries again, in case this is needed with unassigned allocation
|
// elect primaries again, in case this is needed with unassigned allocation
|
||||||
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
|
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
|
||||||
}
|
}
|
||||||
|
|
||||||
// move shards that no longer can be allocated
|
// move shards that no longer can be allocated
|
||||||
|
@ -269,13 +269,31 @@ public class AllocationService extends AbstractComponent {
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) {
|
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
|
||||||
boolean changed = false;
|
boolean changed = false;
|
||||||
RoutingNodes routingNodes = allocation.routingNodes();
|
RoutingNodes routingNodes = allocation.routingNodes();
|
||||||
if (!routingNodes.hasUnassignedPrimaries()) {
|
if (!routingNodes.hasUnassignedPrimaries()) {
|
||||||
// move out if we don't have unassigned primaries
|
// move out if we don't have unassigned primaries
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// go over and remove dangling replicas that are initializing for primary shards
|
||||||
|
List<ShardRouting> shardsToFail = Lists.newArrayList();
|
||||||
|
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
|
||||||
|
if (shardEntry.primary()) {
|
||||||
|
for (MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) {
|
||||||
|
if (!routing.primary() && routing.initializing()) {
|
||||||
|
shardsToFail.add(routing);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (ShardRouting shardToFail : shardsToFail) {
|
||||||
|
changed |= applyFailedShard(allocation, shardToFail, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
|
||||||
|
// routingNodes.hasUnassignedPrimaries() will potentially be false
|
||||||
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
|
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
|
||||||
if (shardEntry.primary()) {
|
if (shardEntry.primary()) {
|
||||||
MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
|
MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
|
||||||
|
@ -298,28 +316,6 @@ public class AllocationService extends AbstractComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// go over and remove dangling replicas that are initializing, but we couldn't elect primary ones...
|
|
||||||
List<ShardRouting> shardsToFail = null;
|
|
||||||
if (routingNodes.hasUnassignedPrimaries()) {
|
|
||||||
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
|
|
||||||
if (shardEntry.primary()) {
|
|
||||||
for (MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) {
|
|
||||||
if (!routing.primary()) {
|
|
||||||
changed = true;
|
|
||||||
if (shardsToFail == null) {
|
|
||||||
shardsToFail = new ArrayList<>();
|
|
||||||
}
|
|
||||||
shardsToFail.add(routing);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (shardsToFail != null) {
|
|
||||||
for (ShardRouting shardToFail : shardsToFail) {
|
|
||||||
applyFailedShard(allocation, shardToFail, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -421,23 +417,6 @@ public class AllocationService extends AbstractComponent {
|
||||||
|
|
||||||
RoutingNodes routingNodes = allocation.routingNodes();
|
RoutingNodes routingNodes = allocation.routingNodes();
|
||||||
boolean dirty = false;
|
boolean dirty = false;
|
||||||
if (failedShard.primary()) {
|
|
||||||
// we have to fail the initializing replicas if the primary fails
|
|
||||||
// since they might now yet have started the recovery and then they will
|
|
||||||
// stick in the cluster-state forever since the replica has a retry logic that
|
|
||||||
// retries infinitely in that case.
|
|
||||||
List<MutableShardRouting> initializingReplicas = new ArrayList<>();
|
|
||||||
for (MutableShardRouting shard : routingNodes.assignedShards(failedShard)){
|
|
||||||
if (!shard.primary() && shard.initializing()) {
|
|
||||||
initializingReplicas.add(shard);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// we can't do this in the loop above since we modify the iterator and will get
|
|
||||||
// concurrent modification exceptions
|
|
||||||
for (MutableShardRouting shard : initializingReplicas) {
|
|
||||||
dirty |= applyFailedShard(allocation, shard, addToIgnoreList);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (failedShard.relocatingNodeId() != null) {
|
if (failedShard.relocatingNodeId() != null) {
|
||||||
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
|
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
|
||||||
if (failedShard.state() == INITIALIZING) {
|
if (failedShard.state() == INITIALIZING) {
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.routing.allocation;
|
package org.elasticsearch.cluster.routing.allocation;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
@ -482,4 +483,97 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
|
||||||
// make sure the failedShard is not INITIALIZING again on node3
|
// make sure the failedShard is not INITIALIZING again on node3
|
||||||
assertThat(routingNodes.node("node3").get(0).shardId(), not(equalTo(shardToFail.shardId())));
|
assertThat(routingNodes.node("node3").get(0).shardId(), not(equalTo(shardToFail.shardId())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailAllReplicasInitializingOnPrimaryFail() {
|
||||||
|
AllocationService allocation = createAllocationService(settingsBuilder()
|
||||||
|
.build());
|
||||||
|
|
||||||
|
MetaData metaData = MetaData.builder()
|
||||||
|
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(2))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
RoutingTable routingTable = RoutingTable.builder()
|
||||||
|
.addAsNew(metaData.index("test"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
|
||||||
|
|
||||||
|
// add 4 nodes
|
||||||
|
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
|
||||||
|
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState).routingTable()).build();
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
|
||||||
|
// start primary shards
|
||||||
|
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build();
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
|
||||||
|
|
||||||
|
// fail the primary shard, check replicas get removed as well...
|
||||||
|
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
|
||||||
|
RoutingAllocation.Result routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
|
||||||
|
assertThat(routingResult.changed(), equalTo(true));
|
||||||
|
clusterState = ClusterState.builder(clusterState).routingTable(routingResult.routingTable()).build();
|
||||||
|
// the primary gets allocated on another node, replicas are unassigned
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
|
||||||
|
|
||||||
|
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
|
||||||
|
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
|
||||||
|
|
||||||
|
// start the primary shard
|
||||||
|
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build();
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
|
||||||
|
|
||||||
|
// simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it
|
||||||
|
routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
|
||||||
|
assertThat(routingResult.changed(), equalTo(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToElect() {
|
||||||
|
AllocationService allocation = createAllocationService(settingsBuilder()
|
||||||
|
.build());
|
||||||
|
|
||||||
|
MetaData metaData = MetaData.builder()
|
||||||
|
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(2))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
RoutingTable routingTable = RoutingTable.builder()
|
||||||
|
.addAsNew(metaData.index("test"))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
|
||||||
|
|
||||||
|
// add 4 nodes
|
||||||
|
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
|
||||||
|
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState).routingTable()).build();
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
|
||||||
|
// start primary shards
|
||||||
|
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build();
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
|
||||||
|
|
||||||
|
// start another replica shard, while keep one initializing
|
||||||
|
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, ImmutableList.of(clusterState.routingNodes().shardsWithState(INITIALIZING).get(0))).routingTable()).build();
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2));
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
|
||||||
|
|
||||||
|
// fail the primary shard, check one replica gets elected to primary, others become INITIALIZING (from it)
|
||||||
|
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
|
||||||
|
RoutingAllocation.Result routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
|
||||||
|
assertThat(routingResult.changed(), equalTo(true));
|
||||||
|
clusterState = ClusterState.builder(clusterState).routingTable(routingResult.routingTable()).build();
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
|
||||||
|
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
|
||||||
|
|
||||||
|
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
|
||||||
|
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
|
||||||
|
|
||||||
|
// simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it
|
||||||
|
routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
|
||||||
|
assertThat(routingResult.changed(), equalTo(false));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue