Local primaries should be preferred to relocating primaries

To reproduce the bug use -Dtests.seed=5AB62524C9AB0489
Fixes #4237
This commit is contained in:
Igor Motov 2013-11-23 17:03:52 -05:00
parent 8e17d636ef
commit e3d4d73242
3 changed files with 145 additions and 1 deletions

View File

@ -82,7 +82,9 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
List<MutableShardRouting> shards = node.shards();
for (int i = 0; i < shards.size(); i++) {
MutableShardRouting shard = shards.get(i);
if (shard.state() == ShardRoutingState.INITIALIZING && shard.primary()) {
// when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
// we only count initial recoveries here, so we need to make sure that relocating node is null
if (shard.state() == ShardRoutingState.INITIALIZING && shard.primary() && shard.relocatingNodeId() == null) {
primariesInRecovery++;
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.startRandomInitializingShard;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class PreferLocalPrimariesToRelocatingPrimariesTests extends ElasticsearchTestCase {
@Test
public void testPreferLocalPrimaryAllocationOverFiltered() {
int concurrentRecoveries = randomIntBetween(1, 10);
int primaryRecoveries = randomIntBetween(1, 10);
int numberOfShards = randomIntBetween(5, 20);
int totalNumberOfShards = numberOfShards * 2;
logger.info("create an allocation with [{}] initial primary recoveries and [{}] concurrent recoveries", primaryRecoveries, concurrentRecoveries);
AllocationService strategy = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.node_concurrent_recoveries", concurrentRecoveries)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", primaryRecoveries)
.build());
logger.info("create 2 indices with [{}] no replicas, and wait till all are allocated", numberOfShards);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test1").numberOfShards(numberOfShards).numberOfReplicas(0))
.put(IndexMetaData.builder("test2").numberOfShards(numberOfShards).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test1"))
.addAsNew(metaData.index("test2"))
.build();
ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build();
logger.info("adding two nodes and performing rerouting till all are allocated");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("node1", ImmutableMap.of("tag1", "value1")))
.put(newNode("node2", ImmutableMap.of("tag1", "value2")))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
while (!clusterState.routingNodes().shardsWithState(INITIALIZING).isEmpty()) {
routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
}
logger.info("remove one of the nodes and apply filter to move everything from another node");
metaData = MetaData.builder()
.put(IndexMetaData.builder("test1").settings(settingsBuilder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0)
.put("index.routing.allocation.exclude.tag1", "value2")
.build()))
.put(IndexMetaData.builder("test2").settings(settingsBuilder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", 0)
.put("index.routing.allocation.exclude.tag1", "value2")
.build()))
.build();
clusterState = ClusterState.builder(clusterState).metaData(metaData).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node1")).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
logger.info("[{}] primaries should be still started but [{}] other primaries should be unassigned", numberOfShards, numberOfShards);
assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(numberOfShards));
assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(numberOfShards));
logger.info("start node back up");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node1", ImmutableMap.of("tag1", "value1")))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
while (clusterState.routingNodes().shardsWithState(STARTED).size() < totalNumberOfShards) {
int localInitializations = 0;
int relocatingInitializations = 0;
for (MutableShardRouting routing : clusterState.routingNodes().shardsWithState(INITIALIZING)) {
if (routing.relocatingNodeId() == null) {
localInitializations++;
} else {
relocatingInitializations++;
}
}
int needToInitialize = totalNumberOfShards - clusterState.routingNodes().shardsWithState(STARTED).size() - clusterState.routingNodes().shardsWithState(RELOCATING).size();
logger.info("local initializations: [{}], relocating: [{}], need to initialize: {}", localInitializations, relocatingInitializations, needToInitialize);
assertThat(localInitializations, equalTo(Math.min(primaryRecoveries, needToInitialize)));
clusterState = startRandomInitializingShard(clusterState, strategy);
}
}
}

View File

@ -20,14 +20,21 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Ignore;
import java.util.List;
import java.util.Map;
import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
@Ignore("Not a test")
public class RoutingAllocationTests extends ElasticsearchTestCase {
@ -42,4 +49,14 @@ public class RoutingAllocationTests extends ElasticsearchTestCase {
public static DiscoveryNode newNode(String nodeId, Map<String, String> attributes) {
return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT);
}
public static ClusterState startRandomInitializingShard(ClusterState clusterState, AllocationService strategy) {
List<MutableShardRouting> initializingShards = clusterState.routingNodes().shardsWithState(INITIALIZING);
if (initializingShards.isEmpty()) {
return clusterState;
}
RoutingTable routingTable = strategy.applyStartedShards(clusterState, newArrayList(initializingShards.get(randomInt(initializingShards.size() - 1)))).routingTable();
return ClusterState.builder(clusterState).routingTable(routingTable).build();
}
}