Merge remote-tracking branch 'dakrone/node-limit-decider'

This commit is contained in:
Lee Hinman 2015-11-09 11:03:19 -07:00
commit 25f8db9aa9
4 changed files with 174 additions and 30 deletions

View File

@ -205,6 +205,7 @@ public class ClusterModule extends AbstractModule {
registerClusterDynamicSetting(TransportService.SETTING_TRACE_LOG_EXCLUDE, Validator.EMPTY); registerClusterDynamicSetting(TransportService.SETTING_TRACE_LOG_EXCLUDE, Validator.EMPTY);
registerClusterDynamicSetting(TransportService.SETTING_TRACE_LOG_EXCLUDE + ".*", Validator.EMPTY); registerClusterDynamicSetting(TransportService.SETTING_TRACE_LOG_EXCLUDE + ".*", Validator.EMPTY);
registerClusterDynamicSetting(TransportCloseIndexAction.SETTING_CLUSTER_INDICES_CLOSE_ENABLE, Validator.BOOLEAN); registerClusterDynamicSetting(TransportCloseIndexAction.SETTING_CLUSTER_INDICES_CLOSE_ENABLE, Validator.BOOLEAN);
registerClusterDynamicSetting(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, Validator.INTEGER);
} }
private void registerBuiltinIndexSettings() { private void registerBuiltinIndexSettings() {

View File

@ -24,13 +24,16 @@ import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;
/** /**
* This {@link AllocationDecider} limits the number of shards per node on a per * This {@link AllocationDecider} limits the number of shards per node on a per
* index basis. The allocator prevents a single node to hold more than * index or node-wide basis. The allocator prevents a single node to hold more
* {@value #INDEX_TOTAL_SHARDS_PER_NODE} per index during the allocation * than {@value #INDEX_TOTAL_SHARDS_PER_NODE} per index and
* {@value #CLUSTER_TOTAL_SHARDS_PER_NODE} globally during the allocation
* process. The limits of this decider can be changed in real-time via a the * process. The limits of this decider can be changed in real-time via a the
* index settings API. * index settings API.
* <p> * <p>
@ -50,66 +53,140 @@ public class ShardsLimitAllocationDecider extends AllocationDecider {
public static final String NAME = "shards_limit"; public static final String NAME = "shards_limit";
private volatile int clusterShardLimit;
/** /**
* Controls the maximum number of shards per index on a single Elasticsearch * Controls the maximum number of shards per index on a single Elasticsearch
* node. Negative values are interpreted as unlimited. * node. Negative values are interpreted as unlimited.
*/ */
public static final String INDEX_TOTAL_SHARDS_PER_NODE = "index.routing.allocation.total_shards_per_node"; public static final String INDEX_TOTAL_SHARDS_PER_NODE = "index.routing.allocation.total_shards_per_node";
/**
* Controls the maximum number of shards per node on a global level.
* Negative values are interpreted as unlimited.
*/
public static final String CLUSTER_TOTAL_SHARDS_PER_NODE = "cluster.routing.allocation.total_shards_per_node";
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
Integer newClusterLimit = settings.getAsInt(CLUSTER_TOTAL_SHARDS_PER_NODE, null);
if (newClusterLimit != null) {
logger.info("updating [{}] from [{}] to [{}]", CLUSTER_TOTAL_SHARDS_PER_NODE,
ShardsLimitAllocationDecider.this.clusterShardLimit, newClusterLimit);
ShardsLimitAllocationDecider.this.clusterShardLimit = newClusterLimit;
}
}
}
@Inject @Inject
public ShardsLimitAllocationDecider(Settings settings) { public ShardsLimitAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings); super(settings);
this.clusterShardLimit = settings.getAsInt(CLUSTER_TOTAL_SHARDS_PER_NODE, -1);
nodeSettingsService.addListener(new ApplySettings());
} }
@Override @Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index()); IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
int totalShardsPerNode = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1); int indexShardLimit = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1);
if (totalShardsPerNode <= 0) { // Capture the limit here in case it changes during this method's
return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [%d] <= 0", totalShardsPerNode); // execution
final int clusterShardLimit = this.clusterShardLimit;
if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0",
indexShardLimit, clusterShardLimit);
} }
int nodeCount = 0; int indexShardCount = 0;
int nodeShardCount = 0;
for (ShardRouting nodeShard : node) { for (ShardRouting nodeShard : node) {
if (!nodeShard.index().equals(shardRouting.index())) {
continue;
}
// don't count relocating shards... // don't count relocating shards...
if (nodeShard.relocating()) { if (nodeShard.relocating()) {
continue; continue;
} }
nodeCount++; nodeShardCount++;
if (nodeShard.index().equals(shardRouting.index())) {
indexShardCount++;
}
} }
if (nodeCount >= totalShardsPerNode) { if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) {
return allocation.decision(Decision.NO, NAME, "too many shards for this index on node [%d], limit: [%d]", return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
nodeCount, totalShardsPerNode); nodeShardCount, clusterShardLimit);
} }
return allocation.decision(Decision.YES, NAME, "shard count under limit [%d] of total shards per node", totalShardsPerNode); if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) {
return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]",
shardRouting.index(), indexShardCount, indexShardLimit);
}
return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node",
indexShardLimit, clusterShardLimit);
} }
@Override @Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index()); IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
int totalShardsPerNode = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1); int indexShardLimit = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1);
if (totalShardsPerNode <= 0) { // Capture the limit here in case it changes during this method's
return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [%d] <= 0", totalShardsPerNode); // execution
final int clusterShardLimit = this.clusterShardLimit;
if (indexShardLimit <= 0 && clusterShardLimit <= 0) {
return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0",
indexShardLimit, clusterShardLimit);
} }
int nodeCount = 0; int indexShardCount = 0;
int nodeShardCount = 0;
for (ShardRouting nodeShard : node) { for (ShardRouting nodeShard : node) {
if (!nodeShard.index().equals(shardRouting.index())) {
continue;
}
// don't count relocating shards... // don't count relocating shards...
if (nodeShard.relocating()) { if (nodeShard.relocating()) {
continue; continue;
} }
nodeCount++; nodeShardCount++;
if (nodeShard.index().equals(shardRouting.index())) {
indexShardCount++;
}
} }
if (nodeCount > totalShardsPerNode) { // Subtle difference between the `canAllocate` and `canRemain` is that
return allocation.decision(Decision.NO, NAME, "too many shards for this index on node [%d], limit: [%d]", // this checks > while canAllocate checks >=
nodeCount, totalShardsPerNode); if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) {
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
nodeShardCount, clusterShardLimit);
} }
return allocation.decision(Decision.YES, NAME, "shard count under limit [%d] of total shards per node", totalShardsPerNode); if (indexShardLimit > 0 && indexShardCount > indexShardLimit) {
return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]",
shardRouting.index(), indexShardCount, indexShardLimit);
}
return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node",
indexShardLimit, clusterShardLimit);
}
@Override
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
// Only checks the node-level limit, not the index-level
// Capture the limit here in case it changes during this method's
// execution
final int clusterShardLimit = this.clusterShardLimit;
if (clusterShardLimit <= 0) {
return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [cluster: %d] <= 0",
clusterShardLimit);
}
int nodeShardCount = 0;
for (ShardRouting nodeShard : node) {
// don't count relocating shards...
if (nodeShard.relocating()) {
continue;
}
nodeShardCount++;
}
if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) {
return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]",
nodeShardCount, clusterShardLimit);
}
return allocation.decision(Decision.YES, NAME, "shard count under node limit [%d] of total shards per node",
clusterShardLimit);
} }
} }

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.test.ESAllocationTestCase;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
@ -87,6 +88,64 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
} }
public void testClusterLevelShardsLimitAllocate() {
AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, 1)
.build());
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 4)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("Adding two nodes and performing rerouting");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
logger.info("Start the primary shards");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1));
assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(2));
// Bump the cluster total shards to 2
strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10)
.put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, 2)
.build());
logger.info("Do another reroute, make sure shards are now allocated");
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1));
routingNodes = clusterState.getRoutingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(2));
assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(2));
assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(0));
}
public void testIndexLevelShardsLimitRemain() { public void testIndexLevelShardsLimitRemain() {
AllocationService strategy = createAllocationService(settingsBuilder() AllocationService strategy = createAllocationService(settingsBuilder()
.put("cluster.routing.allocation.concurrent_recoveries", 10) .put("cluster.routing.allocation.concurrent_recoveries", 10)

View File

@ -14,10 +14,17 @@ number of shards from a single index allowed per node:
The maximum number of shards (replicas and primaries) that will be The maximum number of shards (replicas and primaries) that will be
allocated to a single node. Defaults to unbounded. allocated to a single node. Defaults to unbounded.
You can also limit the amount of shards a node can have regardless of the index:
`cluster.routing.allocation.total_shards_per_node`::
The maximum number of shards (replicas and primaries) that will be
allocated to a single node globally. Defaults to unbounded (-1).
[WARNING] [WARNING]
======================================= =======================================
This setting imposes a hard limit which can result in some shards not Thess setting impose a hard limit which can result in some shards not being
being allocated. allocated.
Use with caution. Use with caution.
======================================= =======================================