Only fail relocation target shard if failing source shard is a primary (#18574)

If the relocation source fails during the relocation of a shard from one node to another, the
relocation target is currently failed as well. For replica shards this is not necessary,
however, as the actual shard recovery of the relocation target is done via the primary shard.
This commit is contained in:
Yannick Welsch 2016-05-27 15:28:57 +02:00
parent 123e40726e
commit 2b47a2643c
6 changed files with 119 additions and 25 deletions

View File

@ -23,12 +23,9 @@ import com.carrotsearch.hppc.ObjectIntHashMap;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -808,12 +805,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
Map<Index, Integer> indicesAndShards = new HashMap<>(); Map<Index, Integer> indicesAndShards = new HashMap<>();
for (RoutingNode node : routingNodes) { for (RoutingNode node : routingNodes) {
for (ShardRouting shard : node) { for (ShardRouting shard : node) {
if (!shard.active() && shard.relocatingNodeId() == null) { if (shard.initializing() && shard.relocatingNodeId() == null) {
if (!shard.relocating()) { inactiveShardCount++;
inactiveShardCount++; if (shard.primary()) {
if (shard.primary()) { inactivePrimaryCount++;
inactivePrimaryCount++;
}
} }
} }
if (shard.relocating()) { if (shard.relocating()) {
@ -986,6 +981,20 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return unassigned; return unassigned;
} }
/**
* Removes relocation source of an initializing non-primary shard. This allows the replica shard to continue recovery from
* the primary even though its non-primary relocation source has failed.
*/
public ShardRouting removeRelocationSource() {
assert shard.isRelocationTarget();
ensureMutable();
ShardRouting relocationMarkerRemoved = shard.removeRelocationSource();
updateAssigned(shard, relocationMarkerRemoved);
inactiveShardCount++; // relocation targets are not counted as inactive shards whereas initializing shards are
Recoveries.getOrAdd(recoveriesPerNode, shard.relocatingNodeId()).addOutgoing(-1);
return relocationMarkerRemoved;
}
public ShardRouting current() { public ShardRouting current() {
return shard; return shard;
} }

View File

@ -371,6 +371,20 @@ public final class ShardRouting implements Writeable, ToXContent {
null, AllocationId.cancelRelocation(allocationId), UNAVAILABLE_EXPECTED_SHARD_SIZE); null, AllocationId.cancelRelocation(allocationId), UNAVAILABLE_EXPECTED_SHARD_SIZE);
} }
/**
* Removes relocation source of a non-primary shard. The shard state must be <code>INITIALIZING</code>.
* This allows the non-primary shard to continue recovery from the primary even though its non-primary
* relocation source has failed.
*/
public ShardRouting removeRelocationSource() {
assert primary == false : this;
assert state == ShardRoutingState.INITIALIZING : this;
assert assignedToNode() : this;
assert relocatingNodeId != null : this;
return new ShardRouting(shardId, currentNodeId, null, restoreSource, primary, state, unassignedInfo,
AllocationId.finishRelocation(allocationId), expectedShardSize);
}
/** /**
* Moves the shard from started to initializing * Moves the shard from started to initializing
*/ */

View File

@ -576,13 +576,11 @@ public class AllocationService extends AbstractComponent {
} }
} }
} else { } else {
// The fail shard is the main copy of the current shard routing. Any // The fail shard is the main copy of the current shard routing.
// relocation will be cancelled (and the target shard removed as well)
// and the shard copy needs to be marked as unassigned
boolean addAsUnassigned = true;
if (failedShard.relocatingNodeId() != null) { if (failedShard.relocatingNodeId() != null) {
// handle relocation source shards. we need to find the target initializing shard that is recovering, and remove it... // now, find the shard that is initializing on the target node
assert failedShard.initializing() == false; // should have been dealt with and returned
assert failedShard.relocating(); assert failedShard.relocating();
RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId()); RoutingNodes.RoutingNodeIterator initializingNode = routingNodes.routingNodeIter(failedShard.relocatingNodeId());
@ -590,14 +588,25 @@ public class AllocationService extends AbstractComponent {
while (initializingNode.hasNext()) { while (initializingNode.hasNext()) {
ShardRouting shardRouting = initializingNode.next(); ShardRouting shardRouting = initializingNode.next();
if (shardRouting.isRelocationTargetOf(failedShard)) { if (shardRouting.isRelocationTargetOf(failedShard)) {
logger.trace("{} is removed due to the failure of the source shard", shardRouting); if (failedShard.primary()) {
initializingNode.remove(); logger.trace("{} is removed due to the failure of the source shard", shardRouting);
// cancel and remove target shard
initializingNode.remove();
} else {
logger.trace("{}, relocation source failed, mark as initializing without relocation source", shardRouting);
// promote to initializing shard without relocation source and ensure that removed relocation source
// is not added back as unassigned shard
initializingNode.removeRelocationSource();
addAsUnassigned = false;
}
break;
} }
} }
} }
} }
if (addAsUnassigned) {
matchedNode.moveToUnassigned(unassignedInfo); matchedNode.moveToUnassigned(unassignedInfo);
}
} }
assert matchedNode.isRemoved() : "failedShard " + failedShard + " was matched but wasn't removed"; assert matchedNode.isRemoved() : "failedShard " + failedShard + " was matched but wasn't removed";
return true; return true;

View File

@ -146,7 +146,6 @@ public class CancelAllocationCommand implements AllocationCommand {
} }
} else if (shardRouting.relocating()) { } else if (shardRouting.relocating()) {
// the shard is relocating to another node, cancel the recovery on the other node, and deallocate this one
if (!allowPrimary && shardRouting.primary()) { if (!allowPrimary && shardRouting.primary()) {
// can't cancel a primary shard being initialized // can't cancel a primary shard being initialized
if (explain) { if (explain) {
@ -156,17 +155,30 @@ public class CancelAllocationCommand implements AllocationCommand {
throw new IllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + throw new IllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " +
discoNode + ", shard is primary and initializing its state"); discoNode + ", shard is primary and initializing its state");
} }
shardRouting = it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null)); it.remove();
// now, go and find the shard that is initializing on the target node, and cancel it as well... boolean addAsUnassigned = true;
// now, find the shard that is initializing on the target node
RoutingNodes.RoutingNodeIterator initializingNode = allocation.routingNodes().routingNodeIter(shardRouting.relocatingNodeId()); RoutingNodes.RoutingNodeIterator initializingNode = allocation.routingNodes().routingNodeIter(shardRouting.relocatingNodeId());
if (initializingNode != null) { if (initializingNode != null) {
while (initializingNode.hasNext()) { while (initializingNode.hasNext()) {
ShardRouting initializingShardRouting = initializingNode.next(); ShardRouting initializingShardRouting = initializingNode.next();
if (initializingShardRouting.isRelocationTargetOf(shardRouting)) { if (initializingShardRouting.isRelocationTargetOf(shardRouting)) {
initializingNode.remove(); if (shardRouting.primary()) {
// cancel and remove target shard
initializingNode.remove();
} else {
// promote to initializing shard without relocation source and ensure that removed relocation source
// is not added back as unassigned shard
initializingNode.removeRelocationSource();
addAsUnassigned = false;
}
break;
} }
} }
} }
if (addAsUnassigned) {
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REROUTE_CANCELLED, null));
}
} }
} else { } else {
// the shard is not relocating, its either started, or initializing, just cancel it and move on... // the shard is not relocating, its either started, or initializing, just cancel it and move on...

View File

@ -59,6 +59,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
/** /**
*/ */
@ -381,15 +382,42 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1)); assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1));
logger.info("--> move the replica shard again");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", 0, "node2", "node3")), false, false);
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(RELOCATING).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> cancel the source replica shard");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node2", false)), false, false);
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(INITIALIZING).get(0).relocatingNodeId(), nullValue());
logger.info("--> start the former target replica shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(STARTED).size(), equalTo(1));
logger.info("--> cancel the primary allocation (with allow_primary set to true)"); logger.info("--> cancel the primary allocation (with allow_primary set to true)");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node1", true)), false, false); rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand("test", 0, "node1", true)), false, false);
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(rerouteResult.changed(), equalTo(true)); assertThat(rerouteResult.changed(), equalTo(true));
logger.error(clusterState.prettyPrint()); assertThat(clusterState.getRoutingNodes().node("node3").shardsWithState(STARTED).iterator().next().primary(), equalTo(true));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(STARTED).iterator().next().primary(), equalTo(true));
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0)); assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0));
} }
public void testSerialization() throws Exception { public void testSerialization() throws Exception {

View File

@ -408,6 +408,28 @@ public class IndexRecoveryIT extends ESIntegTestCase {
assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, Type.REPLICA, nodeB, nodeC, false); assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, Type.REPLICA, nodeB, nodeC, false);
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex()); validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
if (randomBoolean()) {
// shutdown node with relocation source of replica shard and check if recovery continues
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeA));
ensureStableCluster(2);
response = client().admin().indices().prepareRecoveries(INDEX_NAME).execute().actionGet();
recoveryStates = response.shardRecoveryStates().get(INDEX_NAME);
nodeARecoveryStates = findRecoveriesForTargetNode(nodeA, recoveryStates);
assertThat(nodeARecoveryStates.size(), equalTo(0));
nodeBRecoveryStates = findRecoveriesForTargetNode(nodeB, recoveryStates);
assertThat(nodeBRecoveryStates.size(), equalTo(1));
nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates);
assertThat(nodeCRecoveryStates.size(), equalTo(1));
assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, Stage.DONE, nodeA, nodeB, false);
validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex());
assertOnGoingRecoveryState(nodeCRecoveryStates.get(0), 0, Type.REPLICA, nodeB, nodeC, false);
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
}
logger.info("--> speeding up recoveries"); logger.info("--> speeding up recoveries");
restoreRecoverySpeed(); restoreRecoverySpeed();
ensureGreen(); ensureGreen();