diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 7bd7ea9bbaf..0d79392cdda 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -787,10 +787,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if (shard.started()) { // skip initializing, unassigned and relocating shards we can't relocate them anyway Decision allocationDecision = deciders.canAllocate(shard, node, allocation); - Decision rebalanceDecission = deciders.canRebalance(shard, allocation); - + Decision rebalanceDecision = deciders.canRebalance(shard, allocation); if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE)) - && ((rebalanceDecission.type() == Type.YES) || (rebalanceDecission.type() == Type.THROTTLE))) { + && ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) { Decision srcDecision; if ((srcDecision = maxNode.removeShard(shard)) != null) { minNode.addShard(shard, srcDecision); @@ -798,7 +797,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if (delta < minCost) { minCost = delta; candidate = shard; - decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecission); + decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); } minNode.removeShard(shard); maxNode.addShard(shard, srcDecision); diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java index a07198e675a..30c63b4db33 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/AllocationDecidersModule.java @@ -73,6 +73,7 @@ public class AllocationDecidersModule extends AbstractModule { add(DisableAllocationDecider.class). add(AwarenessAllocationDecider.class). add(ShardsLimitAllocationDecider.class). + add(NodeVersionAllocationDecider.class). add(DiskThresholdDecider.class). add(SnapshotInProgressAllocationDecider.class).build(); } diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java new file mode 100644 index 00000000000..219b9a9bbe2 --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java @@ -0,0 +1,75 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation.decider; + +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; + +/** + * An allocation decider that prevents relocation or allocation from nodes + * that might note be version compatible. If we relocate from a node that runs + * a newer version than the node we relocate to this might cause {@link org.apache.lucene.index.IndexFormatTooNewException} + * on the lowest level since it might have already written segments that use a new postings format or codec that is not + * available on the target node. + */ +public class NodeVersionAllocationDecider extends AllocationDecider { + + @Inject + public NodeVersionAllocationDecider(Settings settings) { + super(settings); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + String sourceNodeId = shardRouting.currentNodeId(); + /* if sourceNodeId is not null we do a relocation and just check the version of the node + * that we are currently allocate on. If not we are initializing and recover from primary.*/ + if (sourceNodeId == null) { // we allocate - check primary + if (shardRouting.primary()) { + // we are the primary we can allocate wherever + return Decision.YES; + } + final MutableShardRouting primary = allocation.routingNodes().activePrimary(shardRouting); + if (primary == null) { // we have a primary - it's a start ;) + return Decision.YES; + } + sourceNodeId = primary.currentNodeId(); + } + return isVersionCompatible(allocation.routingNodes(), sourceNodeId, node); + + } + + private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target) { + final RoutingNode source = routingNodes.node(sourceNodeId); + if (target.node().version().onOrAfter(source.node().version())) { + /* we can allocate if we can recover from a node that is younger or on the same version + * if the primary is already running on a newer version that won't work due to possible + * differences in the lucene index format etc.*/ + return Decision.YES; + } else { + return Decision.NO; + } + } +} diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java new file mode 100644 index 00000000000..62a5f2e9006 --- /dev/null +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -0,0 +1,346 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.test.ElasticsearchAllocationTestCase; +import org.junit.Test; + +import java.util.*; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.Matchers.*; + +/** + * + */ +public class NodeVersionAllocationDeciderTests extends ElasticsearchAllocationTestCase { + + private final ESLogger logger = Loggers.getLogger(NodeVersionAllocationDeciderTests.class); + + @Test + public void testDoNotAllocateFromPrimary() { + AllocationService strategy = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").numberOfShards(5).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build(); + + assertThat(routingTable.index("test").shards().size(), equalTo(5)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(2).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(2).currentNodeId(), nullValue()); + } + + logger.info("start two nodes and fully start the shards"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(2)); + + } + + logger.info("start all the primary shards, replicas will start initializing"); + RoutingNodes routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(1)); + } + + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShardsWithState(STARTED).size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(1)); + } + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) + .put(newNode("node3", getPreviousVersion()))) + .build(); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShardsWithState(STARTED).size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(1)); + } + + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) + .put(newNode("node4"))) + .build(); + prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShardsWithState(STARTED).size(), equalTo(1)); + assertThat(routingTable.index("test").shard(i).replicaShardsWithState(INITIALIZING).size(), equalTo(1)); + } + + routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).replicaShardsWithState(STARTED).size(), equalTo(2)); + } + } + + + @Test + public void testRandom() { + AllocationService service = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .build()); + + logger.info("Building initial routing table"); + MetaData.Builder builder = MetaData.builder(); + RoutingTable.Builder rtBuilder = RoutingTable.builder(); + int numIndices = between(1, 20); + for (int i = 0; i < numIndices; i++) { + builder.put(IndexMetaData.builder("test_" + i).numberOfShards(between(1, 5)).numberOfReplicas(between(0, 2))); + } + MetaData metaData = builder.build(); + + for (int i = 0; i < numIndices; i++) { + rtBuilder.addAsNew(metaData.index("test_" + i)); + } + RoutingTable routingTable = rtBuilder.build(); + + ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build(); + assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(routingTable.allShards().size())); + List nodes = new ArrayList(); + int nodeIdx = 0; + int iters = atLeast(10); + for (int i = 0; i < iters; i++) { + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(); + int numNodes = between(1, 20); + if (nodes.size() > numNodes) { + Collections.shuffle(nodes, getRandom()); + nodes = nodes.subList(0, numNodes); + } else { + for (int j = nodes.size(); j < numNodes; j++) { + if (frequently()) { + nodes.add(newNode("node" + (nodeIdx++), randomBoolean() ? getPreviousVersion() : Version.CURRENT)); + } else { + nodes.add(newNode("node" + (nodeIdx++), randomVersion())); + } + } + } + for (DiscoveryNode node : nodes) { + nodesBuilder.put(node); + } + clusterState = ClusterState.builder(clusterState).nodes(nodesBuilder).build(); + clusterState = stabelize(clusterState, service); + } + } + + @Test + public void testRollingRestart() { + AllocationService service = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put("cluster.routing.allocation.allow_rebalance", "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").numberOfShards(5).numberOfReplicas(2)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build(); + + assertThat(routingTable.index("test").shards().size(), equalTo(5)); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(2).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(2).currentNodeId(), nullValue()); + } + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(newNode("old0", getPreviousVersion())) + .put(newNode("old1", getPreviousVersion())) + .put(newNode("old2", getPreviousVersion()))).build(); + clusterState = stabelize(clusterState, service); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(newNode("old0", getPreviousVersion())) + .put(newNode("old1", getPreviousVersion())) + .put(newNode("new0"))).build(); + + clusterState = stabelize(clusterState, service); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(newNode("node0", getPreviousVersion())) + .put(newNode("new1")) + .put(newNode("new0"))).build(); + + clusterState = stabelize(clusterState, service); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(newNode("new2")) + .put(newNode("new1")) + .put(newNode("new0"))).build(); + + clusterState = stabelize(clusterState, service); + routingTable = clusterState.routingTable(); + for (int i = 0; i < routingTable.index("test").shards().size(); i++) { + assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); + assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).shards().get(2).state(), equalTo(STARTED)); + assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), notNullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), notNullValue()); + assertThat(routingTable.index("test").shard(i).shards().get(2).currentNodeId(), notNullValue()); + } + } + + private ClusterState stabelize(ClusterState clusterState, AllocationService service) { + logger.debug("RoutingNodes: {}", clusterState.routingNodes().prettyPrint()); + + RoutingTable routingTable = service.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + assertRecoveryNodeVersions(routingNodes); + + logger.info("start all the primary shards, replicas will start initializing"); + routingNodes = clusterState.routingNodes(); + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + assertRecoveryNodeVersions(routingNodes); + + logger.info("start the replica shards"); + routingNodes = clusterState.routingNodes(); + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + logger.info("complete rebalancing"); + RoutingTable prev = routingTable; + while (true) { + logger.debug("RoutingNodes: {}", clusterState.getRoutingNodes().prettyPrint()); + routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + if (routingTable == prev) + break; + assertRecoveryNodeVersions(routingNodes); + prev = routingTable; + } + return clusterState; + } + + private final void assertRecoveryNodeVersions(RoutingNodes routingNodes) { + logger.debug("RoutingNodes: {}", routingNodes.prettyPrint()); + + List mutableShardRoutings = routingNodes.shardsWithState(ShardRoutingState.RELOCATING); + for (MutableShardRouting r : mutableShardRoutings) { + String toId = r.relocatingNodeId(); + String fromId = r.currentNodeId(); + assertThat(fromId, notNullValue()); + assertThat(toId, notNullValue()); + logger.debug("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version()); + assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version())); + } + + mutableShardRoutings = routingNodes.shardsWithState(ShardRoutingState.INITIALIZING); + for (MutableShardRouting r : mutableShardRoutings) { + if (r.initializing() && r.relocatingNodeId() == null && !r.primary()) { + MutableShardRouting primary = routingNodes.activePrimary(r); + assertThat(primary, notNullValue()); + String fromId = primary.currentNodeId(); + String toId = r.currentNodeId(); + logger.debug("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version()); + assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version())); + } + } + + + } +} diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java b/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java index f3d4416d343..4b655fb2ba4 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchAllocationTestCase.java @@ -100,6 +100,10 @@ public class ElasticsearchAllocationTestCase extends ElasticsearchTestCase { return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); } + public static DiscoveryNode newNode(String nodeId, Version version) { + return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, version); + } + public static ClusterState startRandomInitializingShard(ClusterState clusterState, AllocationService strategy) { List initializingShards = clusterState.routingNodes().shardsWithState(INITIALIZING); if (initializingShards.isEmpty()) {