Improve handling of failed primary replica handling

Out of #6808, we improved the handling of a primary failing to make sure replicas that are initializing are properly failed as well. After double checking it, it has 2 problems, the first, if the same shard routing is failed again, there is no protection that we don't apply the failure (which we do in failed shard cases), and the other was that we already tried to handle it (wrongly) in the elect primary method.
This change fixes the handling to work correctly in the elect primary method, and adds unit tests to verify the behavior
The change also expose a problem in our handling of replica shards that stay initializing during primary failure and electing another replica shard as primary, where we need to cancel its ongoing recovery to make sure it re-starts from the new elected primary
closes #6825
This commit is contained in:
Shay Banon 2014-07-11 09:11:51 +02:00
parent a84777e990
commit 01ca81e2a3
6 changed files with 195 additions and 71 deletions

View File

@ -310,6 +310,12 @@ public class RoutingNodes implements Iterable<RoutingNode> {
for (RoutingNode routingNode : this) {
shards.addAll(routingNode.shardsWithState(state));
}
for (ShardRoutingState s : state) {
if (s == ShardRoutingState.UNASSIGNED) {
Iterables.addAll(shards, unassigned());
break;
}
}
return shards;
}
@ -319,6 +325,16 @@ public class RoutingNodes implements Iterable<RoutingNode> {
for (RoutingNode routingNode : this) {
shards.addAll(routingNode.shardsWithState(index, state));
}
for (ShardRoutingState s : state) {
if (s == ShardRoutingState.UNASSIGNED) {
for (MutableShardRouting unassignedShard : unassignedShards) {
if (unassignedShard.index().equals(index)) {
shards.add(unassignedShard);
}
}
break;
}
}
return shards;
}

View File

@ -192,7 +192,7 @@ public class AllocationService extends AbstractComponent {
// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
@ -210,13 +210,13 @@ public class AllocationService extends AbstractComponent {
// elect primaries *before* allocating unassigned, so backups of primaries that failed
// will be moved to primary state and not wait for primaries to be allocated and recovered (*from gateway*)
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().hasUnassigned()) {
changed |= shardsAllocators.allocateUnassigned(allocation);
// elect primaries again, in case this is needed with unassigned allocation
changed |= electPrimariesAndUnassignDanglingReplicas(allocation);
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
}
// move shards that no longer can be allocated
@ -269,13 +269,31 @@ public class AllocationService extends AbstractComponent {
return changed;
}
private boolean electPrimariesAndUnassignDanglingReplicas(RoutingAllocation allocation) {
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes();
if (!routingNodes.hasUnassignedPrimaries()) {
// move out if we don't have unassigned primaries
return changed;
}
// go over and remove dangling replicas that are initializing for primary shards
List<ShardRouting> shardsToFail = Lists.newArrayList();
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
for (MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) {
if (!routing.primary() && routing.initializing()) {
shardsToFail.add(routing);
}
}
}
}
for (ShardRouting shardToFail : shardsToFail) {
changed |= applyFailedShard(allocation, shardToFail, false);
}
// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
// routingNodes.hasUnassignedPrimaries() will potentially be false
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry);
@ -298,28 +316,6 @@ public class AllocationService extends AbstractComponent {
}
}
// go over and remove dangling replicas that are initializing, but we couldn't elect primary ones...
List<ShardRouting> shardsToFail = null;
if (routingNodes.hasUnassignedPrimaries()) {
for (MutableShardRouting shardEntry : routingNodes.unassigned()) {
if (shardEntry.primary()) {
for (MutableShardRouting routing : routingNodes.assignedShards(shardEntry)) {
if (!routing.primary()) {
changed = true;
if (shardsToFail == null) {
shardsToFail = new ArrayList<>();
}
shardsToFail.add(routing);
}
}
}
}
if (shardsToFail != null) {
for (ShardRouting shardToFail : shardsToFail) {
applyFailedShard(allocation, shardToFail, false);
}
}
}
return changed;
}
@ -421,23 +417,6 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = allocation.routingNodes();
boolean dirty = false;
if (failedShard.primary()) {
// we have to fail the initializing replicas if the primary fails
// since they might now yet have started the recovery and then they will
// stick in the cluster-state forever since the replica has a retry logic that
// retries infinitely in that case.
List<MutableShardRouting> initializingReplicas = new ArrayList<>();
for (MutableShardRouting shard : routingNodes.assignedShards(failedShard)){
if (!shard.primary() && shard.initializing()) {
initializingReplicas.add(shard);
}
}
// we can't do this in the loop above since we modify the iterator and will get
// concurrent modification exceptions
for (MutableShardRouting shard : initializingReplicas) {
dirty |= applyFailedShard(allocation, shard, addToIgnoreList);
}
}
if (failedShard.relocatingNodeId() != null) {
// the shard is relocating, either in initializing (recovery from another node) or relocating (moving to another node)
if (failedShard.state() == INITIALIZING) {

View File

@ -24,6 +24,7 @@ import com.carrotsearch.hppc.ObjectContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
@ -541,6 +542,18 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// cancel recovery just in case we are in recovery (its fine if we are not in recovery, it will be a noop).
recoveryTarget.cancelRecovery(indexShard);
indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
} else if (isPeerRecovery(shardRouting)) {
// check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
RecoveryStatus recoveryStatus = recoveryTarget.recoveryStatus(indexShard.shardId());
if (recoveryStatus != null && recoveryStatus.stage() != RecoveryState.Stage.DONE) {
// we have an ongoing recovery, find the source based on current routing and compare them
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
if (!recoveryStatus.sourceNode().equals(sourceNode)) {
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
recoveryTarget.cancelRecovery(indexShard);
indexService.removeShard(shardRouting.id(), "removing shard (recovery source node changed)");
}
}
}
}
@ -630,34 +643,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// figure out where to recover from (node or disk, in which case sourceNode is null)
DiscoveryNode sourceNode = null;
if (!shardRouting.primary()) {
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
for (ShardRouting entry : shardRoutingTable) {
if (entry.primary() && entry.started()) {
// only recover from started primary, if we can't find one, we will do it next round
sourceNode = nodes.get(entry.currentNodeId());
if (sourceNode == null) {
logger.trace("can't recover replica because primary shard {} is assigned to an unknown node. ignoring.", entry);
return;
}
break;
}
}
if (sourceNode == null) {
logger.trace("can't recover replica for {} because a primary shard can not be found. ignoring.", shardRouting.shardId());
return;
}
} else if (shardRouting.relocatingNodeId() != null) {
sourceNode = nodes.get(shardRouting.relocatingNodeId());
if (sourceNode == null) {
logger.trace("can't recover from remote primary shard {} because it is assigned to an unknown node [{}]. ignoring.", shardRouting.shardId(), shardRouting.relocatingNodeId());
return;
}
if (isPeerRecovery(shardRouting)) {
sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
}
// if there is no shard, create it
if (!indexService.hasShard(shardId)) {
if (failedShards.containsKey(shardRouting.shardId())) {
@ -750,6 +739,45 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
}
/**
* Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard
* routing to *require* peer recovery, use {@link #isPeerRecovery(org.elasticsearch.cluster.routing.ShardRouting)} to
* check if its needed or not.
*/
private DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) {
DiscoveryNode sourceNode = null;
if (!shardRouting.primary()) {
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
for (ShardRouting entry : shardRoutingTable) {
if (entry.primary() && entry.started()) {
// only recover from started primary, if we can't find one, we will do it next round
sourceNode = nodes.get(entry.currentNodeId());
if (sourceNode == null) {
logger.trace("can't find replica source node because primary shard {} is assigned to an unknown node. ignoring.", entry);
return null;
}
break;
}
}
if (sourceNode == null) {
logger.trace("can't find replica source node for {} because a primary shard can not be found. ignoring.", shardRouting.shardId());
}
} else if (shardRouting.relocatingNodeId() != null) {
sourceNode = nodes.get(shardRouting.relocatingNodeId());
if (sourceNode == null) {
logger.trace("can't find relocation source node for shard {} because it is assigned to an unknown node [{}]. ignoring.", shardRouting.shardId(), shardRouting.relocatingNodeId());
}
} else {
throw new ElasticsearchIllegalStateException("trying to find source node for peer recovery when routing state means no peer recovery: " + shardRouting);
}
return sourceNode;
}
private boolean isPeerRecovery(ShardRouting shardRouting) {
return !shardRouting.primary() || shardRouting.relocatingNodeId() != null;
}
private class PeerRecoveryListener implements RecoveryTarget.RecoveryListener {
private final StartRecoveryRequest request;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.InternalIndexShard;
@ -41,10 +42,12 @@ public class RecoveryStatus {
final long recoveryId;
final InternalIndexShard indexShard;
final RecoveryState recoveryState;
final DiscoveryNode sourceNode;
public RecoveryStatus(long recoveryId, InternalIndexShard indexShard) {
public RecoveryStatus(long recoveryId, InternalIndexShard indexShard, DiscoveryNode sourceNode) {
this.recoveryId = recoveryId;
this.indexShard = indexShard;
this.sourceNode = sourceNode;
this.shardId = indexShard.shardId();
this.recoveryState = new RecoveryState(shardId);
recoveryState.getTimer().startTime(System.currentTimeMillis());
@ -57,6 +60,10 @@ public class RecoveryStatus {
private volatile ConcurrentMap<String, IndexOutput> openIndexOutputs = ConcurrentCollections.newConcurrentMap();
public final Store.LegacyChecksums legacyChecksums = new Store.LegacyChecksums();
public DiscoveryNode sourceNode() {
return this.sourceNode;
}
public RecoveryState recoveryState() {
return recoveryState;
}

View File

@ -163,7 +163,7 @@ public class RecoveryTarget extends AbstractComponent {
return;
}
// create a new recovery status, and process...
final RecoveryStatus recoveryStatus = new RecoveryStatus(request.recoveryId(), indexShard);
final RecoveryStatus recoveryStatus = new RecoveryStatus(request.recoveryId(), indexShard, request.sourceNode());
recoveryStatus.recoveryState.setType(request.recoveryType());
recoveryStatus.recoveryState.setSourceNode(request.sourceNode());
recoveryStatus.recoveryState.setTargetNode(request.targetNode());

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -482,4 +483,97 @@ public class FailedShardsRoutingTests extends ElasticsearchAllocationTestCase {
// make sure the failedShard is not INITIALIZING again on node3
assertThat(routingNodes.node("node3").get(0).shardId(), not(equalTo(shardToFail.shardId())));
}
@Test
public void testFailAllReplicasInitializingOnPrimaryFail() {
AllocationService allocation = createAllocationService(settingsBuilder()
.build());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
// add 4 nodes
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
// start primary shards
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
// fail the primary shard, check replicas get removed as well...
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
RoutingAllocation.Result routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(routingResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(routingResult.routingTable()).build();
// the primary gets allocated on another node, replicas are unassigned
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
// start the primary shard
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
// simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it
routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(routingResult.changed(), equalTo(false));
}
@Test
public void testFailAllReplicasInitializingOnPrimaryFailWhileHavingAReplicaToElect() {
AllocationService allocation = createAllocationService(settingsBuilder()
.build());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
// add 4 nodes
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
clusterState = ClusterState.builder(clusterState).routingTable(allocation.reroute(clusterState).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(UNASSIGNED).size(), equalTo(2));
// start primary shards
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
// start another replica shard, while keep one initializing
clusterState = ClusterState.builder(clusterState).routingTable(allocation.applyStartedShards(clusterState, ImmutableList.of(clusterState.routingNodes().shardsWithState(INITIALIZING).get(0))).routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(2));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(1));
// fail the primary shard, check one replica gets elected to primary, others become INITIALIZING (from it)
ShardRouting primaryShardToFail = clusterState.routingTable().index("test").shard(0).primaryShard();
RoutingAllocation.Result routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(routingResult.changed(), equalTo(true));
clusterState = ClusterState.builder(clusterState).routingTable(routingResult.routingTable()).build();
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(2));
ShardRouting newPrimaryShard = clusterState.routingTable().index("test").shard(0).primaryShard();
assertThat(newPrimaryShard, not(equalTo(primaryShardToFail)));
// simulate another failure coming in, with the "old" shard routing, verify that nothing changes, and we ignore it
routingResult = allocation.applyFailedShard(clusterState, primaryShardToFail);
assertThat(routingResult.changed(), equalTo(false));
}
}