From 145374b7621cddc4c5f7815a10be85d2a1942a18 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 2 Nov 2015 15:35:39 -0700 Subject: [PATCH] Add cluster-wide setting for total shard limit This adds the `cluster.routing.allocation.total_shards_per_node` setting, which limits the total number of shards across all indices on each node. It defaults to -1 and can be dynamically configured. Resolves #14456 --- .../elasticsearch/cluster/ClusterModule.java | 3 +- .../decider/ShardsLimitAllocationDecider.java | 131 ++++++++++++++---- .../ShardsLimitAllocationTests.java | 59 ++++++++ .../allocation/total_shards.asciidoc | 11 +- 4 files changed, 174 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 31f5eb4a921..4674d143b9e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -205,6 +205,7 @@ public class ClusterModule extends AbstractModule { registerClusterDynamicSetting(TransportService.SETTING_TRACE_LOG_EXCLUDE, Validator.EMPTY); registerClusterDynamicSetting(TransportService.SETTING_TRACE_LOG_EXCLUDE + ".*", Validator.EMPTY); registerClusterDynamicSetting(TransportCloseIndexAction.SETTING_CLUSTER_INDICES_CLOSE_ENABLE, Validator.BOOLEAN); + registerClusterDynamicSetting(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, Validator.INTEGER); } private void registerBuiltinIndexSettings() { @@ -325,4 +326,4 @@ public class ClusterModule extends AbstractModule { bind(NodeMappingRefreshAction.class).asEagerSingleton(); bind(MappingUpdatedAction.class).asEagerSingleton(); } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java index 829a86666a7..3d68ed50d27 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ShardsLimitAllocationDecider.java @@ -24,13 +24,16 @@ import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.settings.NodeSettingsService; /** * This {@link AllocationDecider} limits the number of shards per node on a per - * index basis. The allocator prevents a single node to hold more than - * {@value #INDEX_TOTAL_SHARDS_PER_NODE} per index during the allocation + * index or node-wide basis. The allocator prevents a single node to hold more + * than {@value #INDEX_TOTAL_SHARDS_PER_NODE} per index and + * {@value #CLUSTER_TOTAL_SHARDS_PER_NODE} globally during the allocation * process. The limits of this decider can be changed in real-time via a the * index settings API. *

@@ -50,66 +53,140 @@ public class ShardsLimitAllocationDecider extends AllocationDecider { public static final String NAME = "shards_limit"; + private volatile int clusterShardLimit; + /** * Controls the maximum number of shards per index on a single Elasticsearch * node. Negative values are interpreted as unlimited. */ public static final String INDEX_TOTAL_SHARDS_PER_NODE = "index.routing.allocation.total_shards_per_node"; + /** + * Controls the maximum number of shards per node on a global level. + * Negative values are interpreted as unlimited. + */ + public static final String CLUSTER_TOTAL_SHARDS_PER_NODE = "cluster.routing.allocation.total_shards_per_node"; + + class ApplySettings implements NodeSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + Integer newClusterLimit = settings.getAsInt(CLUSTER_TOTAL_SHARDS_PER_NODE, null); + + if (newClusterLimit != null) { + logger.info("updating [{}] from [{}] to [{}]", CLUSTER_TOTAL_SHARDS_PER_NODE, + ShardsLimitAllocationDecider.this.clusterShardLimit, newClusterLimit); + ShardsLimitAllocationDecider.this.clusterShardLimit = newClusterLimit; + } + } + } @Inject - public ShardsLimitAllocationDecider(Settings settings) { + public ShardsLimitAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) { super(settings); + this.clusterShardLimit = settings.getAsInt(CLUSTER_TOTAL_SHARDS_PER_NODE, -1); + nodeSettingsService.addListener(new ApplySettings()); } @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index()); - int totalShardsPerNode = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1); - if (totalShardsPerNode <= 0) { - return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [%d] <= 0", totalShardsPerNode); + int indexShardLimit = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1); + // Capture the limit here in case it changes during this method's + // execution + final int clusterShardLimit = this.clusterShardLimit; + + if (indexShardLimit <= 0 && clusterShardLimit <= 0) { + return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0", + indexShardLimit, clusterShardLimit); } - int nodeCount = 0; + int indexShardCount = 0; + int nodeShardCount = 0; for (ShardRouting nodeShard : node) { - if (!nodeShard.index().equals(shardRouting.index())) { - continue; - } // don't count relocating shards... if (nodeShard.relocating()) { continue; } - nodeCount++; + nodeShardCount++; + if (nodeShard.index().equals(shardRouting.index())) { + indexShardCount++; + } } - if (nodeCount >= totalShardsPerNode) { - return allocation.decision(Decision.NO, NAME, "too many shards for this index on node [%d], limit: [%d]", - nodeCount, totalShardsPerNode); + if (clusterShardLimit > 0 && nodeShardCount >= clusterShardLimit) { + return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]", + nodeShardCount, clusterShardLimit); } - return allocation.decision(Decision.YES, NAME, "shard count under limit [%d] of total shards per node", totalShardsPerNode); + if (indexShardLimit > 0 && indexShardCount >= indexShardLimit) { + return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]", + shardRouting.index(), indexShardCount, indexShardLimit); + } + return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node", + indexShardLimit, clusterShardLimit); } @Override public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index()); - int totalShardsPerNode = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1); - if (totalShardsPerNode <= 0) { - return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [%d] <= 0", totalShardsPerNode); + int indexShardLimit = indexMd.getSettings().getAsInt(INDEX_TOTAL_SHARDS_PER_NODE, -1); + // Capture the limit here in case it changes during this method's + // execution + final int clusterShardLimit = this.clusterShardLimit; + + if (indexShardLimit <= 0 && clusterShardLimit <= 0) { + return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [index: %d, cluster: %d] <= 0", + indexShardLimit, clusterShardLimit); } - int nodeCount = 0; + int indexShardCount = 0; + int nodeShardCount = 0; for (ShardRouting nodeShard : node) { - if (!nodeShard.index().equals(shardRouting.index())) { - continue; - } // don't count relocating shards... if (nodeShard.relocating()) { continue; } - nodeCount++; + nodeShardCount++; + if (nodeShard.index().equals(shardRouting.index())) { + indexShardCount++; + } } - if (nodeCount > totalShardsPerNode) { - return allocation.decision(Decision.NO, NAME, "too many shards for this index on node [%d], limit: [%d]", - nodeCount, totalShardsPerNode); + // Subtle difference between the `canAllocate` and `canRemain` is that + // this checks > while canAllocate checks >= + if (clusterShardLimit > 0 && nodeShardCount > clusterShardLimit) { + return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]", + nodeShardCount, clusterShardLimit); } - return allocation.decision(Decision.YES, NAME, "shard count under limit [%d] of total shards per node", totalShardsPerNode); + if (indexShardLimit > 0 && indexShardCount > indexShardLimit) { + return allocation.decision(Decision.NO, NAME, "too many shards for this index [%s] on node [%d], limit: [%d]", + shardRouting.index(), indexShardCount, indexShardLimit); + } + return allocation.decision(Decision.YES, NAME, "shard count under index limit [%d] and node limit [%d] of total shards per node", + indexShardLimit, clusterShardLimit); + } + + @Override + public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) { + // Only checks the node-level limit, not the index-level + // Capture the limit here in case it changes during this method's + // execution + final int clusterShardLimit = this.clusterShardLimit; + + if (clusterShardLimit <= 0) { + return allocation.decision(Decision.YES, NAME, "total shard limit disabled: [cluster: %d] <= 0", + clusterShardLimit); + } + + int nodeShardCount = 0; + for (ShardRouting nodeShard : node) { + // don't count relocating shards... + if (nodeShard.relocating()) { + continue; + } + nodeShardCount++; + } + if (clusterShardLimit >= 0 && nodeShardCount >= clusterShardLimit) { + return allocation.decision(Decision.NO, NAME, "too many shards for this node [%d], limit: [%d]", + nodeShardCount, clusterShardLimit); + } + return allocation.decision(Decision.YES, NAME, "shard count under node limit [%d] of total shards per node", + clusterShardLimit); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsLimitAllocationTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsLimitAllocationTests.java index 9ecae9c6760..e9d6143b6cd 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsLimitAllocationTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsLimitAllocationTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESAllocationTestCase; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -87,6 +88,64 @@ public class ShardsLimitAllocationTests extends ESAllocationTestCase { clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); } + public void testClusterLevelShardsLimitAllocate() { + AllocationService strategy = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, 1) + .build()); + + logger.info("Building initial routing table"); + + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 4) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0))) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); + logger.info("Adding two nodes and performing rerouting"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); + assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); + + logger.info("Start the primary shards"); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1)); + assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(1)); + assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(2)); + + // Bump the cluster total shards to 2 + strategy = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put(ShardsLimitAllocationDecider.CLUSTER_TOTAL_SHARDS_PER_NODE, 2) + .build()); + + logger.info("Do another reroute, make sure shards are now allocated"); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); + assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); + + routingNodes = clusterState.getRoutingNodes(); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + + assertThat(clusterState.getRoutingNodes().node("node1").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(2)); + assertThat(clusterState.getRoutingNodes().node("node2").numberOfShardsWithState(ShardRoutingState.STARTED), equalTo(2)); + assertThat(clusterState.getRoutingNodes().unassigned().size(), equalTo(0)); + } + public void testIndexLevelShardsLimitRemain() { AllocationService strategy = createAllocationService(settingsBuilder() .put("cluster.routing.allocation.concurrent_recoveries", 10) diff --git a/docs/reference/index-modules/allocation/total_shards.asciidoc b/docs/reference/index-modules/allocation/total_shards.asciidoc index 3e1b3ab16e8..691ab8d937d 100644 --- a/docs/reference/index-modules/allocation/total_shards.asciidoc +++ b/docs/reference/index-modules/allocation/total_shards.asciidoc @@ -14,10 +14,17 @@ number of shards from a single index allowed per node: The maximum number of shards (replicas and primaries) that will be allocated to a single node. Defaults to unbounded. +You can also limit the amount of shards a node can have regardless of the index: + +`cluster.routing.allocation.total_shards_per_node`:: + + The maximum number of shards (replicas and primaries) that will be + allocated to a single node globally. Defaults to unbounded (-1). + [WARNING] ======================================= -This setting imposes a hard limit which can result in some shards not -being allocated. +Thess setting impose a hard limit which can result in some shards not being +allocated. Use with caution. =======================================