fix test bug where a small time window exists that can trigger a false failure due to default concurrent recoveries
This commit is contained in:
parent
237c4ddf54
commit
c5395436e6
|
@ -54,6 +54,8 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
|
|||
|
||||
public static final String CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = "cluster.routing.allocation.node_initial_primaries_recoveries";
|
||||
public static final String CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = "cluster.routing.allocation.node_concurrent_recoveries";
|
||||
public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = 2;
|
||||
public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = 4;
|
||||
|
||||
private volatile int primariesInitialRecoveries;
|
||||
private volatile int concurrentRecoveries;
|
||||
|
@ -62,8 +64,8 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
|
|||
public ThrottlingAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
super(settings);
|
||||
|
||||
this.primariesInitialRecoveries = settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 4);
|
||||
this.concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.concurrent_recoveries", settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, 2));
|
||||
this.primariesInitialRecoveries = settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES);
|
||||
this.concurrentRecoveries = settings.getAsInt("cluster.routing.allocation.concurrent_recoveries", settings.getAsInt(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES));
|
||||
logger.debug("using node_concurrent_recoveries [{}], node_initial_primaries_recoveries [{}]", concurrentRecoveries, primariesInitialRecoveries);
|
||||
|
||||
nodeSettingsService.addListener(new ApplySettings());
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
|
@ -115,7 +116,23 @@ public class FilteringAllocationTests extends AbstractNodesTests {
|
|||
}
|
||||
client("node1").admin().indices().prepareRefresh().execute().actionGet();
|
||||
assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().getCount(), equalTo(100l));
|
||||
ClusterState clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
|
||||
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index("test");
|
||||
int numShardsOnNode1 = 0;
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
|
||||
for (ShardRouting shardRouting : indexShardRoutingTable) {
|
||||
if ("node1".equals(clusterState.nodes().get(shardRouting.currentNodeId()).name())) {
|
||||
numShardsOnNode1++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (numShardsOnNode1 > ThrottlingAllocationDecider.DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES) {
|
||||
client("node1").admin().cluster().prepareUpdateSettings()
|
||||
.setTransientSettings(settingsBuilder().put("cluster.routing.allocation.node_concurrent_recoveries", numShardsOnNode1)).execute().actionGet();
|
||||
// make sure we can recover all the nodes at once otherwise we might run into a state where one of the shards has not yet started relocating
|
||||
// but we already fired up the request to wait for 0 relocating shards.
|
||||
}
|
||||
logger.info("--> remove index from the first node");
|
||||
client("node1").admin().indices().prepareUpdateSettings("test")
|
||||
.setSettings(settingsBuilder().put("index.routing.allocation.exclude._name", "node1"))
|
||||
|
@ -130,8 +147,8 @@ public class FilteringAllocationTests extends AbstractNodesTests {
|
|||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
|
||||
logger.info("--> verify all shards are allocated on node2 now");
|
||||
ClusterState clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
|
||||
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index("test");
|
||||
clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
|
||||
indexRoutingTable = clusterState.routingTable().index("test");
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
|
||||
for (ShardRouting shardRouting : indexShardRoutingTable) {
|
||||
assertThat(clusterState.nodes().get(shardRouting.currentNodeId()).name(), equalTo("node2"));
|
||||
|
|
Loading…
Reference in New Issue