From 55cc88e1ae9360e9400cd1075306c2c5d94a81ba Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 5 Jan 2016 16:47:04 +0100 Subject: [PATCH] Fix version-based allocation decider to prevent peer recovery from node with older version Relocating a non-primary shard from one node to another is actually done by recovering from the active primary shard in the cluster, and not the node that we are logically relocating from. Closes #15775 --- .../decider/NodeVersionAllocationDecider.java | 22 +++--- .../NodeVersionAllocationDeciderTests.java | 76 +++++++++++++++++-- 2 files changed, 81 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java index 7aab2e4b8ac..3fe34948acd 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java @@ -44,22 +44,24 @@ public class NodeVersionAllocationDecider extends AllocationDecider { @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - String sourceNodeId = shardRouting.currentNodeId(); - /* if sourceNodeId is not null we do a relocation and just check the version of the node - * that we are currently allocate on. If not we are initializing and recover from primary.*/ - if (sourceNodeId == null) { // we allocate - check primary - if (shardRouting.primary()) { - // we are the primary we can allocate wherever + if (shardRouting.primary()) { + if (shardRouting.currentNodeId() == null) { + // fresh primary, we can allocate wherever return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere"); + } else { + // relocating primary, only migrate to newer host + return isVersionCompatible(allocation.routingNodes(), shardRouting.currentNodeId(), node, allocation); } + } else { final ShardRouting primary = allocation.routingNodes().activePrimary(shardRouting); - if (primary == null) { // we have a primary - it's a start ;) + // check that active primary has a newer version so that peer recovery works + if (primary != null) { + return isVersionCompatible(allocation.routingNodes(), primary.currentNodeId(), node, allocation); + } else { + // ReplicaAfterPrimaryActiveAllocationDecider should prevent this case from occurring return allocation.decision(Decision.YES, NAME, "no active primary shard yet"); } - sourceNodeId = primary.currentNodeId(); } - return isVersionCompatible(allocation.routingNodes(), sourceNodeId, node, allocation); - } private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) { diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java index 2b0c7ef6bda..f52178444d6 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -21,19 +21,32 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; +import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.DummyTransportAddress; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.test.gateway.NoopGatewayAllocator; import java.util.ArrayList; import java.util.Collections; @@ -285,6 +298,45 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase { } } + public void testRebalanceDoesNotAllocatePrimaryAndReplicasOnDifferentVersionNodes() { + ShardId shard1 = new ShardId("test1", 0); + ShardId shard2 = new ShardId("test2", 0); + final DiscoveryNode newNode = new DiscoveryNode("newNode", DummyTransportAddress.INSTANCE, Version.CURRENT); + final DiscoveryNode oldNode1 = new DiscoveryNode("oldNode1", DummyTransportAddress.INSTANCE, VersionUtils.getPreviousVersion()); + final DiscoveryNode oldNode2 = new DiscoveryNode("oldNode2", DummyTransportAddress.INSTANCE, VersionUtils.getPreviousVersion()); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder(shard1.getIndex()).settings(settings(Version.CURRENT).put(Settings.EMPTY)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetaData.builder(shard2.getIndex()).settings(settings(Version.CURRENT).put(Settings.EMPTY)).numberOfShards(1).numberOfReplicas(1)) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .add(IndexRoutingTable.builder(shard1.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(shard1) + .addShard(TestShardRouting.newShardRouting(shard1.getIndex(), shard1.getId(), newNode.id(), true, ShardRoutingState.STARTED, 10)) + .addShard(TestShardRouting.newShardRouting(shard1.getIndex(), shard1.getId(), oldNode1.id(), false, ShardRoutingState.STARTED, 10)) + .build()) + ) + .add(IndexRoutingTable.builder(shard2.getIndex()) + .addIndexShard(new IndexShardRoutingTable.Builder(shard2) + .addShard(TestShardRouting.newShardRouting(shard2.getIndex(), shard2.getId(), newNode.id(), true, ShardRoutingState.STARTED, 10)) + .addShard(TestShardRouting.newShardRouting(shard2.getIndex(), shard2.getId(), oldNode1.id(), false, ShardRoutingState.STARTED, 10)) + .build()) + ) + .build(); + ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(routingTable) + .nodes(DiscoveryNodes.builder().put(newNode).put(oldNode1).put(oldNode2)).build(); + AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new NodeVersionAllocationDecider(Settings.EMPTY)}); + AllocationService strategy = new MockAllocationService(Settings.EMPTY, + allocationDeciders, + new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); + RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true); + // the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match + state = ClusterState.builder(state).routingResult(result).build(); + assertThat(result.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0)); + assertThat(result.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0)); + } + private ClusterState stabilize(ClusterState clusterState, AllocationService service) { logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes().prettyPrint()); @@ -317,17 +369,27 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase { List mutableShardRoutings = routingNodes.shardsWithState(ShardRoutingState.RELOCATING); for (ShardRouting r : mutableShardRoutings) { - String toId = r.relocatingNodeId(); - String fromId = r.currentNodeId(); - assertThat(fromId, notNullValue()); - assertThat(toId, notNullValue()); - logger.trace("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version()); - assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version())); + if (r.primary()) { + String toId = r.relocatingNodeId(); + String fromId = r.currentNodeId(); + assertThat(fromId, notNullValue()); + assertThat(toId, notNullValue()); + logger.trace("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version()); + assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version())); + } else { + ShardRouting primary = routingNodes.activePrimary(r); + assertThat(primary, notNullValue()); + String fromId = primary.currentNodeId(); + String toId = r.relocatingNodeId(); + logger.error("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version()); + logger.error(routingNodes.prettyPrint()); + assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version())); + } } mutableShardRoutings = routingNodes.shardsWithState(ShardRoutingState.INITIALIZING); for (ShardRouting r : mutableShardRoutings) { - if (r.initializing() && r.relocatingNodeId() == null && !r.primary()) { + if (!r.primary()) { ShardRouting primary = routingNodes.activePrimary(r); assertThat(primary, notNullValue()); String fromId = primary.currentNodeId();