Add node version check to shard allocation during restore
Verifies that the version of a node is compatible with the version of a shard that's being restored on this node. Fixes #16519
This commit is contained in:
parent
5da9059d0b
commit
a9eb668497
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.routing.allocation.decider;
|
package org.elasticsearch.cluster.routing.allocation.decider;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.routing.RestoreSource;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
@ -46,8 +47,13 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
|
||||||
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
|
||||||
if (shardRouting.primary()) {
|
if (shardRouting.primary()) {
|
||||||
if (shardRouting.currentNodeId() == null) {
|
if (shardRouting.currentNodeId() == null) {
|
||||||
// fresh primary, we can allocate wherever
|
if (shardRouting.restoreSource() != null) {
|
||||||
return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere");
|
// restoring from a snapshot - check that the node can handle the version
|
||||||
|
return isVersionCompatible(shardRouting.restoreSource(), node, allocation);
|
||||||
|
} else {
|
||||||
|
// fresh primary, we can allocate wherever
|
||||||
|
return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere");
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// relocating primary, only migrate to newer host
|
// relocating primary, only migrate to newer host
|
||||||
return isVersionCompatible(allocation.routingNodes(), shardRouting.currentNodeId(), node, allocation);
|
return isVersionCompatible(allocation.routingNodes(), shardRouting.currentNodeId(), node, allocation);
|
||||||
|
@ -77,4 +83,15 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
|
||||||
target.node().version(), source.node().version());
|
target.node().version(), source.node().version());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Decision isVersionCompatible(RestoreSource restoreSource, final RoutingNode target, RoutingAllocation allocation) {
|
||||||
|
if (target.node().version().onOrAfter(restoreSource.version())) {
|
||||||
|
/* we can allocate if we can restore from a snapshot that is older or on the same version */
|
||||||
|
return allocation.decision(Decision.YES, NAME, "target node version [%s] is same or newer than snapshot version [%s]",
|
||||||
|
target.node().version(), restoreSource.version());
|
||||||
|
} else {
|
||||||
|
return allocation.decision(Decision.NO, NAME, "target node version [%s] is older than snapshot version [%s]",
|
||||||
|
target.node().version(), restoreSource.version());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,14 +20,17 @@
|
||||||
package org.elasticsearch.cluster.routing.allocation;
|
package org.elasticsearch.cluster.routing.allocation;
|
||||||
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.EmptyClusterInfoService;
|
import org.elasticsearch.cluster.EmptyClusterInfoService;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||||
|
import org.elasticsearch.cluster.routing.RestoreSource;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
@ -39,6 +42,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
|
||||||
|
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -337,6 +341,37 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
|
||||||
assertThat(result.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0));
|
assertThat(result.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() {
|
||||||
|
final DiscoveryNode newNode = new DiscoveryNode("newNode", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||||
|
final DiscoveryNode oldNode1 = new DiscoveryNode("oldNode1", DummyTransportAddress.INSTANCE, VersionUtils.getPreviousVersion());
|
||||||
|
final DiscoveryNode oldNode2 = new DiscoveryNode("oldNode2", DummyTransportAddress.INSTANCE, VersionUtils.getPreviousVersion());
|
||||||
|
|
||||||
|
int numberOfShards = randomIntBetween(1, 3);
|
||||||
|
MetaData metaData = MetaData.builder()
|
||||||
|
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numberOfShards).numberOfReplicas
|
||||||
|
(randomIntBetween(0, 3)))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
ClusterState state = ClusterState.builder(ClusterName.DEFAULT)
|
||||||
|
.metaData(metaData)
|
||||||
|
.routingTable(RoutingTable.builder().addAsRestore(metaData.index("test"), new RestoreSource(new SnapshotId("rep1", "snp1"),
|
||||||
|
Version.CURRENT, "test")).build())
|
||||||
|
.nodes(DiscoveryNodes.builder().put(newNode).put(oldNode1).put(oldNode2)).build();
|
||||||
|
AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{
|
||||||
|
new ReplicaAfterPrimaryActiveAllocationDecider(Settings.EMPTY),
|
||||||
|
new NodeVersionAllocationDecider(Settings.EMPTY)});
|
||||||
|
AllocationService strategy = new MockAllocationService(Settings.EMPTY,
|
||||||
|
allocationDeciders,
|
||||||
|
new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
|
||||||
|
RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
|
||||||
|
|
||||||
|
// Make sure that primary shards are only allocated on the new node
|
||||||
|
for (int i = 0; i < numberOfShards; i++) {
|
||||||
|
assertEquals("newNode", result.routingTable().index("test").getShards().get(i).primaryShard().currentNodeId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private ClusterState stabilize(ClusterState clusterState, AllocationService service) {
|
private ClusterState stabilize(ClusterState clusterState, AllocationService service) {
|
||||||
logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes().prettyPrint());
|
logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes().prettyPrint());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue