Improve performance of shards limits decider (#53577)

On clusters with a large number of shards, the shards limits allocation
decider can exhibit poor performance leading to timeouts applying
cluster state updates. This occurs because for every shard, we do a loop
to count the number of shards on the node, and the number of shards for
the index of the shard. This is roughly quadratic in the number of
shards. This loop is not necessary, since we already have a O(1) method
to count the number of non-relocating shards on a node, and with this
commit we add some infrastructure to RoutingNode to make counting the
number of shards per index O(1).
This commit is contained in:
Jason Tedor 2020-03-18 20:57:50 -04:00
parent 415d73c27d
commit 90ab949415
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
3 changed files with 54 additions and 16 deletions

View File

@ -21,15 +21,20 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
@ -48,6 +53,8 @@ public class RoutingNode implements Iterable<ShardRouting> {
private final LinkedHashSet<ShardRouting> relocatingShards;
private final HashMap<Index, LinkedHashSet<ShardRouting>> shardsByIndex;
public RoutingNode(String nodeId, DiscoveryNode node, ShardRouting... shards) {
this(nodeId, node, buildShardRoutingMap(shards));
}
@ -58,12 +65,14 @@ public class RoutingNode implements Iterable<ShardRouting> {
this.shards = shards;
this.relocatingShards = new LinkedHashSet<>();
this.initializingShards = new LinkedHashSet<>();
this.shardsByIndex = new LinkedHashMap<>();
for (ShardRouting shardRouting : shards.values()) {
if (shardRouting.initializing()) {
initializingShards.add(shardRouting);
} else if (shardRouting.relocating()) {
relocatingShards.add(shardRouting);
}
shardsByIndex.computeIfAbsent(shardRouting.index(), k -> new LinkedHashSet<>()).add(shardRouting);
}
assert invariant();
}
@ -128,6 +137,7 @@ public class RoutingNode implements Iterable<ShardRouting> {
} else if (shard.relocating()) {
relocatingShards.add(shard);
}
shardsByIndex.computeIfAbsent(shard.index(), k -> new LinkedHashSet<>()).add(shard);
assert invariant();
}
@ -148,11 +158,16 @@ public class RoutingNode implements Iterable<ShardRouting> {
boolean exist = relocatingShards.remove(oldShard);
assert exist : "expected shard " + oldShard + " to exist in relocatingShards";
}
shardsByIndex.get(oldShard.index()).remove(oldShard);
if (shardsByIndex.get(oldShard.index()).isEmpty()) {
shardsByIndex.remove(oldShard.index());
}
if (newShard.initializing()) {
initializingShards.add(newShard);
} else if (newShard.relocating()) {
relocatingShards.add(newShard);
}
shardsByIndex.computeIfAbsent(newShard.index(), k -> new LinkedHashSet<>()).add(newShard);
assert invariant();
}
@ -167,6 +182,10 @@ public class RoutingNode implements Iterable<ShardRouting> {
boolean exist = relocatingShards.remove(shard);
assert exist : "expected shard " + shard + " to exist in relocatingShards";
}
shardsByIndex.get(shard.index()).remove(shard);
if (shardsByIndex.get(shard.index()).isEmpty()) {
shardsByIndex.remove(shard.index());
}
assert invariant();
}
@ -269,6 +288,15 @@ public class RoutingNode implements Iterable<ShardRouting> {
return shards.size() - relocatingShards.size();
}
public int numberOfOwningShardsForIndex(final Index index) {
final LinkedHashSet<ShardRouting> shardRoutings = shardsByIndex.get(index);
if (shardRoutings == null) {
return 0;
} else {
return Math.toIntExact(shardRoutings.stream().filter(Predicate.not(ShardRouting::relocating)).count());
}
}
public String prettyPrint() {
StringBuilder sb = new StringBuilder();
sb.append("-----node_id[").append(nodeId).append("][").append(node == null ? "X" : "V").append("]\n");
@ -316,6 +344,10 @@ public class RoutingNode implements Iterable<ShardRouting> {
assert relocatingShards.size() == shardRoutingsRelocating.size();
assert relocatingShards.containsAll(shardRoutingsRelocating);
final Map<Index, Set<ShardRouting>> shardRoutingsByIndex =
shards.values().stream().collect(Collectors.groupingBy(ShardRouting::index, Collectors.toSet()));
assert shardRoutingsByIndex.equals(shardsByIndex);
return true;
}
}

View File

@ -109,28 +109,20 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
indexShardLimit, clusterShardLimit);
}
int indexShardCount = 0;
int nodeShardCount = 0;
for (ShardRouting nodeShard : node) {
// don't count relocating shards...
if (nodeShard.relocating()) {
continue;
}
nodeShardCount++;
if (nodeShard.index().equals(shardRouting.index())) {
indexShardCount++;
}
}
final int nodeShardCount = node.numberOfOwningShards();
if (clusterShardLimit > 0 && decider.test(nodeShardCount, clusterShardLimit)) {
return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node, cluster setting [%s=%d]",
nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit);
}
if (indexShardLimit > 0 && decider.test(indexShardCount, indexShardLimit)) {
return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node for index [%s], index setting [%s=%d]",
indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit);
if (indexShardLimit > 0) {
final int indexShardCount = node.numberOfOwningShardsForIndex(shardRouting.index());
if (decider.test(indexShardCount, indexShardLimit)) {
return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node for index [%s], index setting [%s=%d]",
indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit);
}
}
return allocation.decision(Decision.YES, NAME,
"the shard count [%d] for this node is under the index limit [%d] and cluster level node limit [%d]",

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
@ -115,4 +116,17 @@ public class RoutingNodeTests extends ESTestCase {
assertThat(routingNode.numberOfOwningShards(), equalTo(2));
}
public void testNumberOfOwningShardsForIndex() {
final ShardRouting test1Shard0 =
TestShardRouting.newShardRouting("test1", 0, "node-1", false, ShardRoutingState.STARTED);
final ShardRouting test2Shard0 =
TestShardRouting.newShardRouting("test2", 0, "node-1", "node-2", false, ShardRoutingState.RELOCATING);
routingNode.add(test1Shard0);
routingNode.add(test2Shard0);
assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(2));
assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test1", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(1));
assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test2", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(0));
assertThat(routingNode.numberOfOwningShardsForIndex(new Index("test3", IndexMetaData.INDEX_UUID_NA_VALUE)), equalTo(0));
}
}