Cannot force allocate primary to a node where the shard already exists (#22031)

Before, it was possible that the SameShardAllocationDecider would allow
force allocation of an unassigned primary to the same node on which an
active replica is assigned.  This could only happen with shadow replica
indices, because when a shadow replica primary fails, the replica gets
promoted to primary but in the INITIALIZED state, not in the STARTED
state (because the engine has specific reinitialization that must take
place in the case of shadow replicas).  Therefore, if the now promoted
primary that is initializing fails also, the primary will be in the
unassigned state, because replica to primary promotion only happens when
the failed shard was in the started state.  The now unassigned primary
shard will go through the allocation deciders, where the
SameShardsAllocationDecider would return a NO decision, but would still
permit force allocation on the primary if all deciders returned NO.

This commit implements canForceAllocatePrimary on the
SameShardAllocationDecider, which ensures that a primary cannot be
force allocated to the same node on which an active replica already
exists.
This commit is contained in:
Ali Beyad 2016-12-08 12:21:19 -05:00 committed by GitHub
parent 8fe4bc1b74
commit 3da04293f3
2 changed files with 86 additions and 34 deletions

View File

@ -59,6 +59,54 @@ public class SameShardAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId());
Decision decision = decideSameNode(shardRouting, node, allocation, assignedShards);
if (decision.type() == Decision.Type.NO || sameHost == false) {
// if its already a NO decision looking at the node, or we aren't configured to look at the host, return the decision
return decision;
}
if (node.node() != null) {
for (RoutingNode checkNode : allocation.routingNodes()) {
if (checkNode.node() == null) {
continue;
}
// check if its on the same host as the one we want to allocate to
boolean checkNodeOnSameHostName = false;
boolean checkNodeOnSameHostAddress = false;
if (Strings.hasLength(checkNode.node().getHostAddress()) && Strings.hasLength(node.node().getHostAddress())) {
if (checkNode.node().getHostAddress().equals(node.node().getHostAddress())) {
checkNodeOnSameHostAddress = true;
}
} else if (Strings.hasLength(checkNode.node().getHostName()) && Strings.hasLength(node.node().getHostName())) {
if (checkNode.node().getHostName().equals(node.node().getHostName())) {
checkNodeOnSameHostName = true;
}
}
if (checkNodeOnSameHostAddress || checkNodeOnSameHostName) {
for (ShardRouting assignedShard : assignedShards) {
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
String hostType = checkNodeOnSameHostAddress ? "address" : "name";
String host = checkNodeOnSameHostAddress ? node.node().getHostAddress() : node.node().getHostName();
return allocation.decision(Decision.NO, NAME,
"the shard cannot be allocated on host %s [%s], where it already exists on node [%s]; " +
"set [%s] to false to allow multiple nodes on the same host to hold the same shard copies",
hostType, host, node.nodeId(), CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.getKey());
}
}
}
}
}
return allocation.decision(Decision.YES, NAME, "the shard does not exist on the same host");
}
@Override
public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
assert shardRouting.primary() : "must not call force allocate on a non-primary shard";
Iterable<ShardRouting> assignedShards = allocation.routingNodes().assignedShards(shardRouting.shardId());
return decideSameNode(shardRouting, node, allocation, assignedShards);
}
private Decision decideSameNode(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation,
Iterable<ShardRouting> assignedShards) {
for (ShardRouting assignedShard : assignedShards) {
if (node.nodeId().equals(assignedShard.currentNodeId())) {
if (assignedShard.isSameAllocation(shardRouting)) {
@ -72,39 +120,6 @@ public class SameShardAllocationDecider extends AllocationDecider {
}
}
}
if (sameHost) {
if (node.node() != null) {
for (RoutingNode checkNode : allocation.routingNodes()) {
if (checkNode.node() == null) {
continue;
}
// check if its on the same host as the one we want to allocate to
boolean checkNodeOnSameHostName = false;
boolean checkNodeOnSameHostAddress = false;
if (Strings.hasLength(checkNode.node().getHostAddress()) && Strings.hasLength(node.node().getHostAddress())) {
if (checkNode.node().getHostAddress().equals(node.node().getHostAddress())) {
checkNodeOnSameHostAddress = true;
}
} else if (Strings.hasLength(checkNode.node().getHostName()) && Strings.hasLength(node.node().getHostName())) {
if (checkNode.node().getHostName().equals(node.node().getHostName())) {
checkNodeOnSameHostName = true;
}
}
if (checkNodeOnSameHostAddress || checkNodeOnSameHostName) {
for (ShardRouting assignedShard : assignedShards) {
if (checkNode.nodeId().equals(assignedShard.currentNodeId())) {
String hostType = checkNodeOnSameHostAddress ? "address" : "name";
String host = checkNodeOnSameHostAddress ? node.node().getHostAddress() : node.node().getHostName();
return allocation.decision(Decision.NO, NAME,
"the shard cannot be allocated on host %s [%s], where it already exists on node [%s]; " +
"set [%s] to false to allow multiple nodes on the same host to hold the same shard copies",
hostType, host, node.nodeId(), CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING.getKey());
}
}
}
}
}
}
return allocation.decision(Decision.YES, NAME, "the shard does not exist on the same " + (sameHost ? "host" : "node"));
return allocation.decision(Decision.YES, NAME, "the shard does not exist on the same node");
}
}

View File

@ -21,6 +21,8 @@ package org.elasticsearch.cluster.routing.allocation;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
@ -28,12 +30,20 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import java.util.Collections;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
@ -86,4 +96,31 @@ public class SameShardRoutingTests extends ESAllocationTestCase {
assertThat(shardRouting.currentNodeId(), equalTo("node3"));
}
}
public void testForceAllocatePrimaryOnSameNodeNotAllowed() {
SameShardAllocationDecider decider = new SameShardAllocationDecider(Settings.EMPTY);
ClusterState clusterState = ClusterStateCreationUtils.state("idx", randomIntBetween(2, 4), 1);
Index index = clusterState.getMetaData().index("idx").getIndex();
ShardRouting primaryShard = clusterState.routingTable().index(index).shard(0).primaryShard();
RoutingNode routingNode = clusterState.getRoutingNodes().node(primaryShard.currentNodeId());
RoutingAllocation routingAllocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, Collections.emptyList()),
new RoutingNodes(clusterState, false), clusterState, ClusterInfo.EMPTY, System.nanoTime(), false
);
// can't force allocate same shard copy to the same node
ShardRouting newPrimary = TestShardRouting.newShardRouting(primaryShard.shardId(), null, true, ShardRoutingState.UNASSIGNED);
Decision decision = decider.canForceAllocatePrimary(newPrimary, routingNode, routingAllocation);
assertEquals(Decision.Type.NO, decision.type());
// can force allocate to a different node
RoutingNode unassignedNode = null;
for (RoutingNode node : clusterState.getRoutingNodes()) {
if (node.isEmpty()) {
unassignedNode = node;
break;
}
}
decision = decider.canForceAllocatePrimary(newPrimary, unassignedNode, routingAllocation);
assertEquals(Decision.Type.YES, decision.type());
}
}