Fix version-based allocation decider to prevent peer recovery from node with older version

Relocating a non-primary shard from one node to another is actually done by recovering from the active
primary shard in the cluster, and not the node that we are logically relocating from.

Closes #15775
This commit is contained in:
Yannick Welsch 2016-01-05 16:47:04 +01:00
parent 7e3ccf2ee3
commit 55cc88e1ae
2 changed files with 81 additions and 17 deletions

View File

@ -44,22 +44,24 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
String sourceNodeId = shardRouting.currentNodeId();
/* if sourceNodeId is not null we do a relocation and just check the version of the node
* that we are currently allocate on. If not we are initializing and recover from primary.*/
if (sourceNodeId == null) { // we allocate - check primary
if (shardRouting.primary()) {
// we are the primary we can allocate wherever
if (shardRouting.primary()) {
if (shardRouting.currentNodeId() == null) {
// fresh primary, we can allocate wherever
return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere");
} else {
// relocating primary, only migrate to newer host
return isVersionCompatible(allocation.routingNodes(), shardRouting.currentNodeId(), node, allocation);
}
} else {
final ShardRouting primary = allocation.routingNodes().activePrimary(shardRouting);
if (primary == null) { // we have a primary - it's a start ;)
// check that active primary has a newer version so that peer recovery works
if (primary != null) {
return isVersionCompatible(allocation.routingNodes(), primary.currentNodeId(), node, allocation);
} else {
// ReplicaAfterPrimaryActiveAllocationDecider should prevent this case from occurring
return allocation.decision(Decision.YES, NAME, "no active primary shard yet");
}
sourceNodeId = primary.currentNodeId();
}
return isVersionCompatible(allocation.routingNodes(), sourceNodeId, node, allocation);
}
private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) {

View File

@ -21,19 +21,32 @@ package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import java.util.ArrayList;
import java.util.Collections;
@ -285,6 +298,45 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
}
}
public void testRebalanceDoesNotAllocatePrimaryAndReplicasOnDifferentVersionNodes() {
ShardId shard1 = new ShardId("test1", 0);
ShardId shard2 = new ShardId("test2", 0);
final DiscoveryNode newNode = new DiscoveryNode("newNode", DummyTransportAddress.INSTANCE, Version.CURRENT);
final DiscoveryNode oldNode1 = new DiscoveryNode("oldNode1", DummyTransportAddress.INSTANCE, VersionUtils.getPreviousVersion());
final DiscoveryNode oldNode2 = new DiscoveryNode("oldNode2", DummyTransportAddress.INSTANCE, VersionUtils.getPreviousVersion());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder(shard1.getIndex()).settings(settings(Version.CURRENT).put(Settings.EMPTY)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder(shard2.getIndex()).settings(settings(Version.CURRENT).put(Settings.EMPTY)).numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.add(IndexRoutingTable.builder(shard1.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(shard1)
.addShard(TestShardRouting.newShardRouting(shard1.getIndex(), shard1.getId(), newNode.id(), true, ShardRoutingState.STARTED, 10))
.addShard(TestShardRouting.newShardRouting(shard1.getIndex(), shard1.getId(), oldNode1.id(), false, ShardRoutingState.STARTED, 10))
.build())
)
.add(IndexRoutingTable.builder(shard2.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(shard2)
.addShard(TestShardRouting.newShardRouting(shard2.getIndex(), shard2.getId(), newNode.id(), true, ShardRoutingState.STARTED, 10))
.addShard(TestShardRouting.newShardRouting(shard2.getIndex(), shard2.getId(), oldNode1.id(), false, ShardRoutingState.STARTED, 10))
.build())
)
.build();
ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(newNode).put(oldNode1).put(oldNode2)).build();
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new NodeVersionAllocationDecider(Settings.EMPTY)});
AllocationService strategy = new MockAllocationService(Settings.EMPTY,
allocationDeciders,
new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
// the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match
state = ClusterState.builder(state).routingResult(result).build();
assertThat(result.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0));
assertThat(result.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0));
}
private ClusterState stabilize(ClusterState clusterState, AllocationService service) {
logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes().prettyPrint());
@ -317,17 +369,27 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
List<ShardRouting> mutableShardRoutings = routingNodes.shardsWithState(ShardRoutingState.RELOCATING);
for (ShardRouting r : mutableShardRoutings) {
String toId = r.relocatingNodeId();
String fromId = r.currentNodeId();
assertThat(fromId, notNullValue());
assertThat(toId, notNullValue());
logger.trace("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version());
assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
if (r.primary()) {
String toId = r.relocatingNodeId();
String fromId = r.currentNodeId();
assertThat(fromId, notNullValue());
assertThat(toId, notNullValue());
logger.trace("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version());
assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
} else {
ShardRouting primary = routingNodes.activePrimary(r);
assertThat(primary, notNullValue());
String fromId = primary.currentNodeId();
String toId = r.relocatingNodeId();
logger.error("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version());
logger.error(routingNodes.prettyPrint());
assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
}
}
mutableShardRoutings = routingNodes.shardsWithState(ShardRoutingState.INITIALIZING);
for (ShardRouting r : mutableShardRoutings) {
if (r.initializing() && r.relocatingNodeId() == null && !r.primary()) {
if (!r.primary()) {
ShardRouting primary = routingNodes.activePrimary(r);
assertThat(primary, notNullValue());
String fromId = primary.currentNodeId();