Refactor shard limit allocation decider

This commit simplifies the shard limit allocation decider, removing some
duplicated code into a common method.

Relates #21845
This commit is contained in:
makeyang 2016-12-02 10:27:02 +08:00 committed by Jason Tedor
parent 1e645bc2d6
commit 3f1d7be07a
1 changed files with 12 additions and 40 deletions

View File

@ -29,6 +29,8 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import java.util.function.BiPredicate;
/** /**
* This {@link AllocationDecider} limits the number of shards per node on a per * This {@link AllocationDecider} limits the number of shards per node on a per
* index or node-wide basis. The allocator prevents a single node to hold more * index or node-wide basis. The allocator prevents a single node to hold more
@ -83,46 +85,17 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
@Override @Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
IndexMetaData indexMd = allocation.metaData().getIndexSafe(shardRouting.index()); return doDecide(shardRouting, node, allocation, (count, limit) -> count >= limit);
final int indexShardLimit = INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(indexMd.getSettings(), settings);
// Capture the limit here in case it changes during this method's
// execution
final int clusterShardLimit = this.clusterShardLimit;
if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
return allocation.decision(Decision.YES, NAME, "total shard limits are disabled: [index: %d, cluster: %d] <= 0",
indexShardLimit, clusterShardLimit);
}
int indexShardCount = 0;
int nodeShardCount = 0;
for (ShardRouting nodeShard : node) {
// don't count relocating shards...
if (nodeShard.relocating()) {
continue;
}
nodeShardCount++;
if (nodeShard.index().equals(shardRouting.index())) {
indexShardCount++;
}
}
if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) {
return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node, [%s=%d]",
nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit);
}
if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) {
return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node for index [%s], [%s=%d]",
indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit);
}
return allocation.decision(Decision.YES, NAME,
"the shard count [%d] for this node is under the index limit [%d] and cluster level node limit [%d]",
nodeShardCount, indexShardLimit, clusterShardLimit);
} }
@Override @Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return doDecide(shardRouting, node, allocation, (count, limit) -> count > limit);
}
private Decision doDecide(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation,
BiPredicate<Integer, Integer> decider) {
IndexMetaData indexMd = allocation.metaData().getIndexSafe(shardRouting.index()); IndexMetaData indexMd = allocation.metaData().getIndexSafe(shardRouting.index());
final int indexShardLimit = INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(indexMd.getSettings(), settings); final int indexShardLimit = INDEX_TOTAL_SHARDS_PER_NODE_SETTING.get(indexMd.getSettings(), settings);
// Capture the limit here in case it changes during this method's // Capture the limit here in case it changes during this method's
@ -146,14 +119,13 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
indexShardCount++; indexShardCount++;
} }
} }
// Subtle difference between the `canAllocate` and `canRemain` is that
// this checks > while canAllocate checks >= if (clusterShardLimit > 0 && decider.test(nodeShardCount, clusterShardLimit)) {
if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) {
return allocation.decision(Decision.NO, NAME, return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node, [%s=%d]", "too many shards [%d] allocated to this node, [%s=%d]",
nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit); nodeShardCount, CLUSTER_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), clusterShardLimit);
} }
if (indexShardLimit > 0 && indexShardCount > indexShardLimit) { if (indexShardLimit > 0 && decider.test(indexShardCount, indexShardLimit)) {
return allocation.decision(Decision.NO, NAME, return allocation.decision(Decision.NO, NAME,
"too many shards [%d] allocated to this node for index [%s], [%s=%d]", "too many shards [%d] allocated to this node for index [%s], [%s=%d]",
indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit); indexShardCount, shardRouting.getIndexName(), INDEX_TOTAL_SHARDS_PER_NODE_SETTING.getKey(), indexShardLimit);