From e3d4d73242d215389280b1d7b9facb7b1bd9bd7d Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Sat, 23 Nov 2013 17:03:52 -0500 Subject: [PATCH] Local primaries should be preferred to relocating primaries To reproduce the bug use -Dtests.seed=5AB62524C9AB0489 Fixes #4237 --- .../decider/ThrottlingAllocationDecider.java | 4 +- ...alPrimariesToRelocatingPrimariesTests.java | 125 ++++++++++++++++++ .../allocation/RoutingAllocationTests.java | 17 +++ 3 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 src/test/java/org/elasticsearch/cluster/routing/allocation/PreferLocalPrimariesToRelocatingPrimariesTests.java diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 8af6d854776..0d556d093f8 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -82,7 +82,9 @@ public class ThrottlingAllocationDecider extends AllocationDecider { List shards = node.shards(); for (int i = 0; i < shards.size(); i++) { MutableShardRouting shard = shards.get(i); - if (shard.state() == ShardRoutingState.INITIALIZING && shard.primary()) { + // when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node* + // we only count initial recoveries here, so we need to make sure that relocating node is null + if (shard.state() == ShardRoutingState.INITIALIZING && shard.primary() && shard.relocatingNodeId() == null) { primariesInRecovery++; } } diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferLocalPrimariesToRelocatingPrimariesTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferLocalPrimariesToRelocatingPrimariesTests.java new file mode 100644 index 00000000000..206fa7bdeae --- /dev/null +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/PreferLocalPrimariesToRelocatingPrimariesTests.java @@ -0,0 +1,125 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation; + +import com.google.common.collect.ImmutableMap; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.newNode; +import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.startRandomInitializingShard; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.Matchers.equalTo; + +/** + */ +public class PreferLocalPrimariesToRelocatingPrimariesTests extends ElasticsearchTestCase { + @Test + public void testPreferLocalPrimaryAllocationOverFiltered() { + int concurrentRecoveries = randomIntBetween(1, 10); + int primaryRecoveries = randomIntBetween(1, 10); + int numberOfShards = randomIntBetween(5, 20); + int totalNumberOfShards = numberOfShards * 2; + + logger.info("create an allocation with [{}] initial primary recoveries and [{}] concurrent recoveries", primaryRecoveries, concurrentRecoveries); + AllocationService strategy = new AllocationService(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", concurrentRecoveries) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", primaryRecoveries) + .build()); + + logger.info("create 2 indices with [{}] no replicas, and wait till all are allocated", numberOfShards); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test1").numberOfShards(numberOfShards).numberOfReplicas(0)) + .put(IndexMetaData.builder("test2").numberOfShards(numberOfShards).numberOfReplicas(0)) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test1")) + .addAsNew(metaData.index("test2")) + .build(); + + ClusterState clusterState = ClusterState.builder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("adding two nodes and performing rerouting till all are allocated"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .put(newNode("node1", ImmutableMap.of("tag1", "value1"))) + .put(newNode("node2", ImmutableMap.of("tag1", "value2")))).build(); + + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + while (!clusterState.routingNodes().shardsWithState(INITIALIZING).isEmpty()) { + routingTable = strategy.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + + logger.info("remove one of the nodes and apply filter to move everything from another node"); + + metaData = MetaData.builder() + .put(IndexMetaData.builder("test1").settings(settingsBuilder() + .put("index.number_of_shards", numberOfShards) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.exclude.tag1", "value2") + .build())) + .put(IndexMetaData.builder("test2").settings(settingsBuilder() + .put("index.number_of_shards", numberOfShards) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.exclude.tag1", "value2") + .build())) + .build(); + clusterState = ClusterState.builder(clusterState).metaData(metaData).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node1")).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + logger.info("[{}] primaries should be still started but [{}] other primaries should be unassigned", numberOfShards, numberOfShards); + assertThat(clusterState.routingNodes().shardsWithState(STARTED).size(), equalTo(numberOfShards)); + assertThat(clusterState.routingNodes().shardsWithState(INITIALIZING).size(), equalTo(0)); + assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(numberOfShards)); + + logger.info("start node back up"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()) + .put(newNode("node1", ImmutableMap.of("tag1", "value1")))).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + while (clusterState.routingNodes().shardsWithState(STARTED).size() < totalNumberOfShards) { + int localInitializations = 0; + int relocatingInitializations = 0; + for (MutableShardRouting routing : clusterState.routingNodes().shardsWithState(INITIALIZING)) { + if (routing.relocatingNodeId() == null) { + localInitializations++; + } else { + relocatingInitializations++; + } + } + int needToInitialize = totalNumberOfShards - clusterState.routingNodes().shardsWithState(STARTED).size() - clusterState.routingNodes().shardsWithState(RELOCATING).size(); + logger.info("local initializations: [{}], relocating: [{}], need to initialize: {}", localInitializations, relocatingInitializations, needToInitialize); + assertThat(localInitializations, equalTo(Math.min(primaryRecoveries, needToInitialize))); + clusterState = startRandomInitializingShard(clusterState, strategy); + } + } +} diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocationTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocationTests.java index c55cf145020..f972a39dc95 100644 --- a/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocationTests.java +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocationTests.java @@ -20,14 +20,21 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Ignore; +import java.util.List; import java.util.Map; +import static com.google.common.collect.Lists.newArrayList; +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; + @Ignore("Not a test") public class RoutingAllocationTests extends ElasticsearchTestCase { @@ -42,4 +49,14 @@ public class RoutingAllocationTests extends ElasticsearchTestCase { public static DiscoveryNode newNode(String nodeId, Map attributes) { return new DiscoveryNode("", nodeId, DummyTransportAddress.INSTANCE, attributes, Version.CURRENT); } + + public static ClusterState startRandomInitializingShard(ClusterState clusterState, AllocationService strategy) { + List initializingShards = clusterState.routingNodes().shardsWithState(INITIALIZING); + if (initializingShards.isEmpty()) { + return clusterState; + } + RoutingTable routingTable = strategy.applyStartedShards(clusterState, newArrayList(initializingShards.get(randomInt(initializingShards.size() - 1)))).routingTable(); + return ClusterState.builder(clusterState).routingTable(routingTable).build(); + } + } \ No newline at end of file