diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 43818aa2fdc..61699b35e14 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -54,6 +54,8 @@ public class ThrottlingAllocationDecider extends AllocationDecider { public static final String CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = "cluster.routing.allocation.node_initial_primaries_recoveries"; public static final String CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = "cluster.routing.allocation.node_concurrent_recoveries"; + public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = 2; + public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = 4; private volatile int primariesInitialRecoveries; private volatile int concurrentRecoveries; @@ -62,8 +64,8 @@ public class ThrottlingAllocationDecider extends AllocationDecider { public ThrottlingAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) { super(settings); - this.primariesInitialRecoveries = settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 4); - this.concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.concurrent_recoveries", settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, 2)); + this.primariesInitialRecoveries = settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES); + this.concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.concurrent_recoveries", settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES)); logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries); nodeSettingsService.addListener(new ApplySettings()); diff --git a/src/test/java/org/elasticsearch/test/integration/cluster/allocation/FilteringAllocationTests.java b/src/test/java/org/elasticsearch/test/integration/cluster/allocation/FilteringAllocationTests.java index bad2423b7bb..751b1f8ec15 100644 --- a/src/test/java/org/elasticsearch/test/integration/cluster/allocation/FilteringAllocationTests.java +++ b/src/test/java/org/elasticsearch/test/integration/cluster/allocation/FilteringAllocationTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.Priority; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -115,7 +116,23 @@ public class FilteringAllocationTests extends AbstractNodesTests { } client("node1").admin().indices().prepareRefresh().execute().actionGet(); assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l)); + ClusterState clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + IndexRoutingTable indexRoutingTable = clusterState.routingTable().index("test"); + int numShardsOnNode1 = 0; + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + for (ShardRouting shardRouting : indexShardRoutingTable) { + if ("node1".equals(clusterState.nodes().get(shardRouting.currentNodeId()).name())) { + numShardsOnNode1++; + } + } + } + if (numShardsOnNode1 > ThrottlingAllocationDecider.DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES) { + client("node1").admin().cluster().prepareUpdateSettings() + .setTransientSettings(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", numShardsOnNode1)).execute().actionGet(); + // make sure we can recover all the nodes at once otherwise we might run into a state where one of the shards has not yet started relocating + // but we already fired up the request to wait for 0 relocating shards. + } logger.info("--> remove index from the first node"); client("node1").admin().indices().prepareUpdateSettings("test") .setSettings(settingsBuilder().put("index.routing.allocation.exclude._name", "node1")) @@ -130,8 +147,8 @@ public class FilteringAllocationTests extends AbstractNodesTests { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> verify all shards are allocated on node2 now"); - ClusterState clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); - IndexRoutingTable indexRoutingTable = clusterState.routingTable().index("test"); + clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState(); + indexRoutingTable = clusterState.routingTable().index("test"); for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { for (ShardRouting shardRouting : indexShardRoutingTable) { assertThat(clusterState.nodes().get(shardRouting.currentNodeId()).name(), equalTo("node2"));