Add NodeVersionAllocationDecider that prevent allocations that require forward compatibility.

Today during restart scenarios it is possible that we recover from a node that
has already been upgraded to version N+1. The node that we relocate to is
on version N and might not be able to read the index format from the node
we relocate from. This causes `IndexFormatToNewException` during
recovery but only after recovery has finished which can cause large
load spikes during the upgrade period.

Closes #4588
This commit is contained in:
Simon Willnauer 2014-01-03 13:53:53 +01:00
parent d5c440cd2e
commit 5b5b2e6c85
5 changed files with 429 additions and 4 deletions

View File

@ -787,10 +787,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (shard.started()) { if (shard.started()) {
// skip initializing, unassigned and relocating shards we can't relocate them anyway // skip initializing, unassigned and relocating shards we can't relocate them anyway
Decision allocationDecision = deciders.canAllocate(shard, node, allocation); Decision allocationDecision = deciders.canAllocate(shard, node, allocation);
Decision rebalanceDecission = deciders.canRebalance(shard, allocation); Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE)) if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE))
&& ((rebalanceDecission.type() == Type.YES) || (rebalanceDecission.type() == Type.THROTTLE))) { && ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) {
Decision srcDecision; Decision srcDecision;
if ((srcDecision = maxNode.removeShard(shard)) != null) { if ((srcDecision = maxNode.removeShard(shard)) != null) {
minNode.addShard(shard, srcDecision); minNode.addShard(shard, srcDecision);
@ -798,7 +797,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (delta < minCost) { if (delta < minCost) {
minCost = delta; minCost = delta;
candidate = shard; candidate = shard;
decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecission); decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
} }
minNode.removeShard(shard); minNode.removeShard(shard);
maxNode.addShard(shard, srcDecision); maxNode.addShard(shard, srcDecision);

View File

@ -73,6 +73,7 @@ public class AllocationDecidersModule extends AbstractModule {
add(DisableAllocationDecider.class). add(DisableAllocationDecider.class).
add(AwarenessAllocationDecider.class). add(AwarenessAllocationDecider.class).
add(ShardsLimitAllocationDecider.class). add(ShardsLimitAllocationDecider.class).
add(NodeVersionAllocationDecider.class).
add(DiskThresholdDecider.class). add(DiskThresholdDecider.class).
add(SnapshotInProgressAllocationDecider.class).build(); add(SnapshotInProgressAllocationDecider.class).build();
} }

View File

@ -0,0 +1,75 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
/**
* An allocation decider that prevents relocation or allocation from nodes
* that might note be version compatible. If we relocate from a node that runs
* a newer version than the node we relocate to this might cause {@link org.apache.lucene.index.IndexFormatTooNewException}
* on the lowest level since it might have already written segments that use a new postings format or codec that is not
* available on the target node.
*/
public class NodeVersionAllocationDecider extends AllocationDecider {
@Inject
public NodeVersionAllocationDecider(Settings settings) {
super(settings);
}
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
String sourceNodeId = shardRouting.currentNodeId();
/* if sourceNodeId is not null we do a relocation and just check the version of the node
* that we are currently allocate on. If not we are initializing and recover from primary.*/
if (sourceNodeId == null) { // we allocate - check primary
if (shardRouting.primary()) {
// we are the primary we can allocate wherever
return Decision.YES;
}
final MutableShardRouting primary = allocation.routingNodes().activePrimary(shardRouting);
if (primary == null) { // we have a primary - it's a start ;)
return Decision.YES;
}
sourceNodeId = primary.currentNodeId();
}
return isVersionCompatible(allocation.routingNodes(), sourceNodeId, node);
}
private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target) {
final RoutingNode source = routingNodes.node(sourceNodeId);
if (target.node().version().onOrAfter(source.node().version())) {
/* we can allocate if we can recover from a node that is younger or on the same version
* if the primary is already running on a newer version that won't work due to possible
* differences in the lucene index format etc.*/
return Decision.YES;
} else {
return Decision.NO;
}
}
}

View File

@ -0,0 +1,346 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;
import java.util.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.*;
/**
*
*/
public class NodeVersionAllocationDeciderTests extends ElasticsearchAllocationTestCase {
private final ESLogger logger = Loggers.getLogger(NodeVersionAllocationDeciderTests.class);
@Test
public void testDoNotAllocateFromPrimary() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build());
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(5).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.index("test").shards().size(), equalTo(5));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(2).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).shards().get(2).currentNodeId(), nullValue());
}
logger.info("start two nodes and fully start the shards");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingTable prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(2));
}
logger.info("start all the primary shards, replicas will start initializing");
RoutingNodes routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(1));
}
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(STARTED).size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(1));
}
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node3", getPreviousVersion())))
.build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(STARTED).size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(UNASSIGNED).size(), equalTo(1));
}
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node4")))
.build();
prevRoutingTable = routingTable;
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(STARTED).size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(INITIALIZING).size(), equalTo(1));
}
routingNodes = clusterState.routingNodes();
prevRoutingTable = routingTable;
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).replicaShardsWithState(STARTED).size(), equalTo(2));
}
}
@Test
public void testRandom() {
AllocationService service = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build());
logger.info("Building initial routing table");
MetaData.Builder builder = MetaData.builder();
RoutingTable.Builder rtBuilder = RoutingTable.builder();
int numIndices = between(1, 20);
for (int i = 0; i < numIndices; i++) {
builder.put(IndexMetaData.builder("test_" + i).numberOfShards(between(1, 5)).numberOfReplicas(between(0, 2)));
}
MetaData metaData = builder.build();
for (int i = 0; i < numIndices; i++) {
rtBuilder.addAsNew(metaData.index("test_" + i));
}
RoutingTable routingTable = rtBuilder.build();
ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(routingTable.allShards().size()));
List<DiscoveryNode> nodes = new ArrayList<DiscoveryNode>();
int nodeIdx = 0;
int iters = atLeast(10);
for (int i = 0; i < iters; i++) {
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
int numNodes = between(1, 20);
if (nodes.size() > numNodes) {
Collections.shuffle(nodes, getRandom());
nodes = nodes.subList(0, numNodes);
} else {
for (int j = nodes.size(); j < numNodes; j++) {
if (frequently()) {
nodes.add(newNode("node" + (nodeIdx++), randomBoolean() ? getPreviousVersion() : Version.CURRENT));
} else {
nodes.add(newNode("node" + (nodeIdx++), randomVersion()));
}
}
}
for (DiscoveryNode node : nodes) {
nodesBuilder.put(node);
}
clusterState = ClusterState.builder(clusterState).nodes(nodesBuilder).build();
clusterState = stabelize(clusterState, service);
}
}
@Test
public void testRollingRestart() {
AllocationService service = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put("cluster.routing.allocation.allow_rebalance", "always")
.put("cluster.routing.allocation.cluster_concurrent_rebalance", -1)
.build());
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").numberOfShards(5).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build();
assertThat(routingTable.index("test").shards().size(), equalTo(5));
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(2).state(), equalTo(UNASSIGNED));
assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), nullValue());
assertThat(routingTable.index("test").shard(i).shards().get(2).currentNodeId(), nullValue());
}
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("old0", getPreviousVersion()))
.put(newNode("old1", getPreviousVersion()))
.put(newNode("old2", getPreviousVersion()))).build();
clusterState = stabelize(clusterState, service);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("old0", getPreviousVersion()))
.put(newNode("old1", getPreviousVersion()))
.put(newNode("new0"))).build();
clusterState = stabelize(clusterState, service);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("node0", getPreviousVersion()))
.put(newNode("new1"))
.put(newNode("new0"))).build();
clusterState = stabelize(clusterState, service);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("new2"))
.put(newNode("new1"))
.put(newNode("new0"))).build();
clusterState = stabelize(clusterState, service);
routingTable = clusterState.routingTable();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
assertThat(routingTable.index("test").shard(i).shards().get(0).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).shards().get(1).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).shards().get(2).state(), equalTo(STARTED));
assertThat(routingTable.index("test").shard(i).shards().get(0).currentNodeId(), notNullValue());
assertThat(routingTable.index("test").shard(i).shards().get(1).currentNodeId(), notNullValue());
assertThat(routingTable.index("test").shard(i).shards().get(2).currentNodeId(), notNullValue());
}
}
private ClusterState stabelize(ClusterState clusterState, AllocationService service) {
logger.debug("RoutingNodes: {}", clusterState.routingNodes().prettyPrint());
RoutingTable routingTable = service.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
assertRecoveryNodeVersions(routingNodes);
logger.info("start all the primary shards, replicas will start initializing");
routingNodes = clusterState.routingNodes();
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertRecoveryNodeVersions(routingNodes);
logger.info("start the replica shards");
routingNodes = clusterState.routingNodes();
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("complete rebalancing");
RoutingTable prev = routingTable;
while (true) {
logger.debug("RoutingNodes: {}", clusterState.getRoutingNodes().prettyPrint());
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
if (routingTable == prev)
break;
assertRecoveryNodeVersions(routingNodes);
prev = routingTable;
}
return clusterState;
}
private final void assertRecoveryNodeVersions(RoutingNodes routingNodes) {
logger.debug("RoutingNodes: {}", routingNodes.prettyPrint());
List<MutableShardRouting> mutableShardRoutings = routingNodes.shardsWithState(ShardRoutingState.RELOCATING);
for (MutableShardRouting r : mutableShardRoutings) {
String toId = r.relocatingNodeId();
String fromId = r.currentNodeId();
assertThat(fromId, notNullValue());
assertThat(toId, notNullValue());
logger.debug("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version());
assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
}
mutableShardRoutings = routingNodes.shardsWithState(ShardRoutingState.INITIALIZING);
for (MutableShardRouting r : mutableShardRoutings) {
if (r.initializing() && r.relocatingNodeId() == null && !r.primary()) {
MutableShardRouting primary = routingNodes.activePrimary(r);
assertThat(primary, notNullValue());
String fromId = primary.currentNodeId();
String toId = r.currentNodeId();
logger.debug("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version());
assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
}
}
}
}

View File

@ -100,6 +100,10 @@ public class ElasticsearchAllocationTestCase extends ElasticsearchTestCase {
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
} }
public static DiscoveryNode newNode(String nodeId, Version version) {
return new DiscoveryNode(nodeId, DummyTransportAddress.INSTANCE, version);
}
public static ClusterState startRandomInitializingShard(ClusterState clusterState, AllocationService strategy) { public static ClusterState startRandomInitializingShard(ClusterState clusterState, AllocationService strategy) {
List<MutableShardRouting> initializingShards = clusterState.routingNodes().shardsWithState(INITIALIZING); List<MutableShardRouting> initializingShards = clusterState.routingNodes().shardsWithState(INITIALIZING);
if (initializingShards.isEmpty()) { if (initializingShards.isEmpty()) {