mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Always enforce cluster-wide shard limit (#34892)
This removes the option to run a cluster without enforcing the cluster-wide shard limit, making strict enforcement the default and only behavior. The limit can still be adjusted as desired using the cluster settings API.
This commit is contained in:
parent
4d525e3e33
commit
119835decd
@ -26,26 +26,18 @@ API can make the cluster read-write again.
|
|||||||
|
|
||||||
==== Cluster Shard Limit
|
==== Cluster Shard Limit
|
||||||
|
|
||||||
In a Elasticsearch 7.0 and later, there will be a soft limit on the number of
|
There is a soft limit on the number of shards in a cluster, based on the number
|
||||||
shards in a cluster, based on the number of nodes in the cluster. This is
|
of nodes in the cluster. This is intended to prevent operations which may
|
||||||
intended to prevent operations which may unintentionally destabilize the
|
unintentionally destabilize the cluster.
|
||||||
cluster. Prior to 7.0, actions which would result in the cluster going over the
|
|
||||||
limit will issue a deprecation warning.
|
|
||||||
|
|
||||||
NOTE: You can set the system property `es.enforce_max_shards_per_node` to `true`
|
|
||||||
to opt in to strict enforcement of the shard limit. If this system property is
|
|
||||||
set, actions which would result in the cluster going over the limit will result
|
|
||||||
in an error, rather than a deprecation warning. This property will be removed in
|
|
||||||
Elasticsearch 7.0, as strict enforcement of the limit will be the default and
|
|
||||||
only behavior.
|
|
||||||
|
|
||||||
If an operation, such as creating a new index, restoring a snapshot of an index,
|
If an operation, such as creating a new index, restoring a snapshot of an index,
|
||||||
or opening a closed index would lead to the number of shards in the cluster
|
or opening a closed index would lead to the number of shards in the cluster
|
||||||
going over this limit, the operation will issue a deprecation warning.
|
going over this limit, the operation will fail with an error indicating the
|
||||||
|
shard limit.
|
||||||
|
|
||||||
If the cluster is already over the limit, due to changes in node membership or
|
If the cluster is already over the limit, due to changes in node membership or
|
||||||
setting changes, all operations that create or open indices will issue warnings
|
setting changes, all operations that create or open indices will fail until
|
||||||
until either the limit is increased as described below, or some indices are
|
either the limit is increased as described below, or some indices are
|
||||||
<<indices-open-close,closed>> or <<indices-delete-index,deleted>> to bring the
|
<<indices-open-close,closed>> or <<indices-delete-index,deleted>> to bring the
|
||||||
number of shards below the limit.
|
number of shards below the limit.
|
||||||
|
|
||||||
@ -53,17 +45,21 @@ Replicas count towards this limit, but closed indexes do not. An index with 5
|
|||||||
primary shards and 2 replicas will be counted as 15 shards. Any closed index
|
primary shards and 2 replicas will be counted as 15 shards. Any closed index
|
||||||
is counted as 0, no matter how many shards and replicas it contains.
|
is counted as 0, no matter how many shards and replicas it contains.
|
||||||
|
|
||||||
The limit defaults to 1,000 shards per node, and be dynamically adjusted using
|
The limit defaults to 1,000 shards per data node, and can be dynamically
|
||||||
the following property:
|
adjusted using the following property:
|
||||||
|
|
||||||
`cluster.max_shards_per_node`::
|
`cluster.max_shards_per_node`::
|
||||||
|
|
||||||
Controls the number of shards allowed in the cluster per node.
|
Controls the number of shards allowed in the cluster per data node.
|
||||||
|
|
||||||
For example, a 3-node cluster with the default setting would allow 3,000 shards
|
For example, a 3-node cluster with the default setting would allow 3,000 shards
|
||||||
total, across all open indexes. If the above setting is changed to 1,500, then
|
total, across all open indexes. If the above setting is changed to 1,500, then
|
||||||
the cluster would allow 4,500 shards total.
|
the cluster would allow 4,500 shards total.
|
||||||
|
|
||||||
|
NOTE: If there are no data nodes in the cluster, the limit will not be enforced.
|
||||||
|
This allows the creation of indices during cluster creation if dedicated master
|
||||||
|
nodes are set up before data nodes.
|
||||||
|
|
||||||
[[user-defined-data]]
|
[[user-defined-data]]
|
||||||
==== User Defined Cluster Metadata
|
==== User Defined Cluster Metadata
|
||||||
|
|
||||||
|
@ -602,7 +602,7 @@ public class MetaDataCreateIndexService {
|
|||||||
final boolean forbidPrivateIndexSettings) throws IndexCreationException {
|
final boolean forbidPrivateIndexSettings) throws IndexCreationException {
|
||||||
List<String> validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings);
|
List<String> validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings);
|
||||||
|
|
||||||
Optional<String> shardAllocation = checkShardLimit(settings, clusterState, deprecationLogger);
|
Optional<String> shardAllocation = checkShardLimit(settings, clusterState);
|
||||||
shardAllocation.ifPresent(validationErrors::add);
|
shardAllocation.ifPresent(validationErrors::add);
|
||||||
|
|
||||||
if (validationErrors.isEmpty() == false) {
|
if (validationErrors.isEmpty() == false) {
|
||||||
@ -617,14 +617,13 @@ public class MetaDataCreateIndexService {
|
|||||||
*
|
*
|
||||||
* @param settings The settings of the index to be created.
|
* @param settings The settings of the index to be created.
|
||||||
* @param clusterState The current cluster state.
|
* @param clusterState The current cluster state.
|
||||||
* @param deprecationLogger The logger to use to emit a deprecation warning, if appropriate.
|
|
||||||
* @return If present, an error message to be used to reject index creation. If empty, a signal that this operation may be carried out.
|
* @return If present, an error message to be used to reject index creation. If empty, a signal that this operation may be carried out.
|
||||||
*/
|
*/
|
||||||
static Optional<String> checkShardLimit(Settings settings, ClusterState clusterState, DeprecationLogger deprecationLogger) {
|
static Optional<String> checkShardLimit(Settings settings, ClusterState clusterState) {
|
||||||
int shardsToCreate = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(settings)
|
int shardsToCreate = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(settings)
|
||||||
* (1 + IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings));
|
* (1 + IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings));
|
||||||
|
|
||||||
return IndicesService.checkShardLimit(shardsToCreate, clusterState, deprecationLogger);
|
return IndicesService.checkShardLimit(shardsToCreate, clusterState);
|
||||||
}
|
}
|
||||||
|
|
||||||
List<String> getIndexSettingsValidationErrors(final Settings settings, final boolean forbidPrivateIndexSettings) {
|
List<String> getIndexSettingsValidationErrors(final Settings settings, final boolean forbidPrivateIndexSettings) {
|
||||||
|
@ -19,8 +19,8 @@
|
|||||||
|
|
||||||
package org.elasticsearch.cluster.metadata;
|
package org.elasticsearch.cluster.metadata;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
@ -188,7 +188,7 @@ public class MetaDataIndexStateService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
validateShardLimit(currentState, request.indices(), deprecationLogger);
|
validateShardLimit(currentState, request.indices());
|
||||||
|
|
||||||
if (indicesToOpen.isEmpty()) {
|
if (indicesToOpen.isEmpty()) {
|
||||||
return currentState;
|
return currentState;
|
||||||
@ -238,16 +238,15 @@ public class MetaDataIndexStateService {
|
|||||||
*
|
*
|
||||||
* @param currentState The current cluster state.
|
* @param currentState The current cluster state.
|
||||||
* @param indices The indices which are to be opened.
|
* @param indices The indices which are to be opened.
|
||||||
* @param deprecationLogger The logger to use to emit a deprecation warning, if appropriate.
|
|
||||||
* @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled.
|
* @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled.
|
||||||
*/
|
*/
|
||||||
static void validateShardLimit(ClusterState currentState, Index[] indices, DeprecationLogger deprecationLogger) {
|
static void validateShardLimit(ClusterState currentState, Index[] indices) {
|
||||||
int shardsToOpen = Arrays.stream(indices)
|
int shardsToOpen = Arrays.stream(indices)
|
||||||
.filter(index -> currentState.metaData().index(index).getState().equals(IndexMetaData.State.CLOSE))
|
.filter(index -> currentState.metaData().index(index).getState().equals(IndexMetaData.State.CLOSE))
|
||||||
.mapToInt(index -> getTotalShardCount(currentState, index))
|
.mapToInt(index -> getTotalShardCount(currentState, index))
|
||||||
.sum();
|
.sum();
|
||||||
|
|
||||||
Optional<String> error = IndicesService.checkShardLimit(shardsToOpen, currentState, deprecationLogger);
|
Optional<String> error = IndicesService.checkShardLimit(shardsToOpen, currentState);
|
||||||
if (error.isPresent()) {
|
if (error.isPresent()) {
|
||||||
ValidationException ex = new ValidationException();
|
ValidationException ex = new ValidationException();
|
||||||
ex.addValidationError(error.get());
|
ex.addValidationError(error.get());
|
||||||
|
@ -155,7 +155,7 @@ public class MetaDataUpdateSettingsService {
|
|||||||
int totalNewShards = Arrays.stream(request.indices())
|
int totalNewShards = Arrays.stream(request.indices())
|
||||||
.mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas))
|
.mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas))
|
||||||
.sum();
|
.sum();
|
||||||
Optional<String> error = IndicesService.checkShardLimit(totalNewShards, currentState, deprecationLogger);
|
Optional<String> error = IndicesService.checkShardLimit(totalNewShards, currentState);
|
||||||
if (error.isPresent()) {
|
if (error.isPresent()) {
|
||||||
ValidationException ex = new ValidationException();
|
ValidationException ex = new ValidationException();
|
||||||
ex.addValidationError(error.get());
|
ex.addValidationError(error.get());
|
||||||
|
@ -54,7 +54,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
|||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
|
||||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Setting.Property;
|
import org.elasticsearch.common.settings.Setting.Property;
|
||||||
@ -1401,11 +1400,10 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||||||
*
|
*
|
||||||
* @param newShards The number of shards to be added by this operation
|
* @param newShards The number of shards to be added by this operation
|
||||||
* @param state The current cluster state
|
* @param state The current cluster state
|
||||||
* @param deprecationLogger The logger to use for deprecation warnings
|
|
||||||
* @return If present, an error message to be given as the reason for failing
|
* @return If present, an error message to be given as the reason for failing
|
||||||
* an operation. If empty, a sign that the operation is valid.
|
* an operation. If empty, a sign that the operation is valid.
|
||||||
*/
|
*/
|
||||||
public static Optional<String> checkShardLimit(int newShards, ClusterState state, DeprecationLogger deprecationLogger) {
|
public static Optional<String> checkShardLimit(int newShards, ClusterState state) {
|
||||||
Settings theseSettings = state.metaData().settings();
|
Settings theseSettings = state.metaData().settings();
|
||||||
int nodeCount = state.getNodes().getDataNodes().size();
|
int nodeCount = state.getNodes().getDataNodes().size();
|
||||||
|
|
||||||
@ -1421,13 +1419,7 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||||||
if ((currentOpenShards + newShards) > maxShardsInCluster) {
|
if ((currentOpenShards + newShards) > maxShardsInCluster) {
|
||||||
String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" +
|
String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" +
|
||||||
currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open";
|
currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open";
|
||||||
if (ENFORCE_MAX_SHARDS_PER_NODE) {
|
|
||||||
return Optional.of(errorMessage);
|
return Optional.of(errorMessage);
|
||||||
} else {
|
|
||||||
deprecationLogger.deprecated("In a future major version, this request will fail because {}. Before upgrading, " +
|
|
||||||
"reduce the number of shards in your cluster or adjust the cluster setting [{}].",
|
|
||||||
errorMessage, MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return Optional.empty();
|
return Optional.empty();
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,7 @@ import java.util.Collections;
|
|||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -471,7 +472,7 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testShardLimitDeprecationWarning() {
|
public void testShardLimit() {
|
||||||
int nodesInCluster = randomIntBetween(2,100);
|
int nodesInCluster = randomIntBetween(2,100);
|
||||||
ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
|
ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
|
||||||
Settings clusterSettings = Settings.builder()
|
Settings clusterSettings = Settings.builder()
|
||||||
@ -487,13 +488,13 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
|
|||||||
.build();
|
.build();
|
||||||
|
|
||||||
DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
||||||
MetaDataCreateIndexService.checkShardLimit(indexSettings, state, deprecationLogger);
|
Optional<String> errorMessage = MetaDataCreateIndexService.checkShardLimit(indexSettings, state);
|
||||||
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||||
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||||
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
||||||
assertWarnings("In a future major version, this request will fail because this action would add [" +
|
assertTrue(errorMessage.isPresent());
|
||||||
totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+
|
assertEquals("this action would add [" + totalShards + "] total shards, but this cluster currently has [" + currentShards
|
||||||
" Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node].");
|
+ "]/[" + maxShards + "] maximum shards open", errorMessage.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
|
import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
|
||||||
|
import org.elasticsearch.common.ValidationException;
|
||||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
@ -40,7 +41,7 @@ import static org.mockito.Mockito.when;
|
|||||||
|
|
||||||
public class MetaDataIndexStateServiceTests extends ESTestCase {
|
public class MetaDataIndexStateServiceTests extends ESTestCase {
|
||||||
|
|
||||||
public void testValidateShardLimitDeprecationWarning() {
|
public void testValidateShardLimit() {
|
||||||
int nodesInCluster = randomIntBetween(2,100);
|
int nodesInCluster = randomIntBetween(2,100);
|
||||||
ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
|
ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
|
||||||
Settings clusterSettings = Settings.builder()
|
Settings clusterSettings = Settings.builder()
|
||||||
@ -55,13 +56,13 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
|
|||||||
.toArray(new Index[2]);
|
.toArray(new Index[2]);
|
||||||
|
|
||||||
DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
||||||
MetaDataIndexStateService.validateShardLimit(state, indices, deprecationLogger);
|
|
||||||
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||||
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||||
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
||||||
assertWarnings("In a future major version, this request will fail because this action would add [" +
|
ValidationException exception = expectThrows(ValidationException.class,
|
||||||
totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+
|
() -> MetaDataIndexStateService.validateShardLimit(state, indices));
|
||||||
" Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node].");
|
assertEquals("Validation Failed: 1: this action would add [" + totalShards + "] total shards, but this cluster currently has [" +
|
||||||
|
currentShards + "]/[" + maxShards + "] maximum shards open;", exception.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas,
|
public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas,
|
||||||
|
@ -20,11 +20,31 @@
|
|||||||
|
|
||||||
package org.elasticsearch.cluster.shards;
|
package org.elasticsearch.cluster.shards;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||||
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
import org.elasticsearch.node.Node;
|
||||||
|
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||||
|
import org.elasticsearch.snapshots.SnapshotState;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||||
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||||
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
|
||||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
|
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
|
||||||
public class ClusterShardLimitIT extends ESIntegTestCase {
|
public class ClusterShardLimitIT extends ESIntegTestCase {
|
||||||
private static final String shardsPerNodeKey = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey();
|
private static final String shardsPerNodeKey = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey();
|
||||||
@ -55,6 +75,233 @@ public class ClusterShardLimitIT extends ESIntegTestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testIndexCreationOverLimit() {
|
||||||
|
int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();
|
||||||
|
|
||||||
|
ShardCounts counts = ShardCounts.forDataNodeCount(dataNodes);
|
||||||
|
|
||||||
|
setShardsPerNode(counts.getShardsPerNode());
|
||||||
|
|
||||||
|
// Create an index that will bring us up to the limit
|
||||||
|
createIndex("test", Settings.builder().put(indexSettings())
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, counts.getFirstIndexShards())
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, counts.getFirstIndexReplicas()).build());
|
||||||
|
|
||||||
|
try {
|
||||||
|
prepareCreate("should-fail", Settings.builder()
|
||||||
|
.put(indexSettings())
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, counts.getFailingIndexShards())
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, counts.getFailingIndexReplicas())).get();
|
||||||
|
fail("Should not have been able to go over the limit");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
verifyException(dataNodes, counts, e);
|
||||||
|
}
|
||||||
|
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||||
|
assertFalse(clusterState.getMetaData().hasIndex("should-fail"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testIncreaseReplicasOverLimit() {
|
||||||
|
int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();
|
||||||
|
|
||||||
|
dataNodes = ensureMultipleDataNodes(dataNodes);
|
||||||
|
|
||||||
|
int firstShardCount = between(2, 10);
|
||||||
|
int shardsPerNode = firstShardCount - 1;
|
||||||
|
setShardsPerNode(shardsPerNode);
|
||||||
|
|
||||||
|
prepareCreate("growing-should-fail", Settings.builder()
|
||||||
|
.put(indexSettings())
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, firstShardCount)
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, 0)).get();
|
||||||
|
|
||||||
|
try {
|
||||||
|
client().admin().indices().prepareUpdateSettings("growing-should-fail")
|
||||||
|
.setSettings(Settings.builder().put("number_of_replicas", dataNodes)).get();
|
||||||
|
fail("shouldn't be able to increase the number of replicas");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
String expectedError = "Validation Failed: 1: this action would add [" + (dataNodes * firstShardCount)
|
||||||
|
+ "] total shards, but this cluster currently has [" + firstShardCount + "]/[" + dataNodes * shardsPerNode
|
||||||
|
+ "] maximum shards open;";
|
||||||
|
assertEquals(expectedError, e.getMessage());
|
||||||
|
}
|
||||||
|
MetaData clusterState = client().admin().cluster().prepareState().get().getState().metaData();
|
||||||
|
assertEquals(0, clusterState.index("growing-should-fail").getNumberOfReplicas());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testChangingMultipleIndicesOverLimit() {
|
||||||
|
int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();
|
||||||
|
|
||||||
|
dataNodes = ensureMultipleDataNodes(dataNodes);
|
||||||
|
|
||||||
|
// Create two indexes: One that ends up with fewer shards, and one
|
||||||
|
// that ends up with more to verify that we check the _total_ number of
|
||||||
|
// shards the operation would add.
|
||||||
|
|
||||||
|
int firstIndexFactor = between (5, 10);
|
||||||
|
int firstIndexShards = firstIndexFactor * dataNodes;
|
||||||
|
int firstIndexReplicas = 0;
|
||||||
|
|
||||||
|
int secondIndexFactor = between(1, 3);
|
||||||
|
int secondIndexShards = secondIndexFactor * dataNodes;
|
||||||
|
int secondIndexReplicas = dataNodes;
|
||||||
|
|
||||||
|
int shardsPerNode = firstIndexFactor + (secondIndexFactor * (1 + secondIndexReplicas));
|
||||||
|
setShardsPerNode(shardsPerNode);
|
||||||
|
|
||||||
|
|
||||||
|
createIndex("test-1-index", Settings.builder().put(indexSettings())
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, firstIndexShards)
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, firstIndexReplicas).build());
|
||||||
|
createIndex("test-2-index", Settings.builder().put(indexSettings())
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, secondIndexShards)
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, secondIndexReplicas).build());
|
||||||
|
try {
|
||||||
|
client().admin().indices()
|
||||||
|
.prepareUpdateSettings(randomFrom("_all", "test-*", "*-index"))
|
||||||
|
.setSettings(Settings.builder().put("number_of_replicas", dataNodes - 1))
|
||||||
|
.get();
|
||||||
|
fail("should not have been able to increase shards above limit");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
int totalShardsBefore = (firstIndexShards * (1 + firstIndexReplicas)) + (secondIndexShards * (1 + secondIndexReplicas));
|
||||||
|
int totalShardsAfter = (dataNodes) * (firstIndexShards + secondIndexShards);
|
||||||
|
int difference = totalShardsAfter - totalShardsBefore;
|
||||||
|
|
||||||
|
String expectedError = "Validation Failed: 1: this action would add [" + difference
|
||||||
|
+ "] total shards, but this cluster currently has [" + totalShardsBefore + "]/[" + dataNodes * shardsPerNode
|
||||||
|
+ "] maximum shards open;";
|
||||||
|
assertEquals(expectedError, e.getMessage());
|
||||||
|
}
|
||||||
|
MetaData clusterState = client().admin().cluster().prepareState().get().getState().metaData();
|
||||||
|
assertEquals(firstIndexReplicas, clusterState.index("test-1-index").getNumberOfReplicas());
|
||||||
|
assertEquals(secondIndexReplicas, clusterState.index("test-2-index").getNumberOfReplicas());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPreserveExistingSkipsCheck() {
|
||||||
|
int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();
|
||||||
|
|
||||||
|
dataNodes = ensureMultipleDataNodes(dataNodes);
|
||||||
|
|
||||||
|
int firstShardCount = between(2, 10);
|
||||||
|
int shardsPerNode = firstShardCount - 1;
|
||||||
|
setShardsPerNode(shardsPerNode);
|
||||||
|
|
||||||
|
prepareCreate("test-index", Settings.builder()
|
||||||
|
.put(indexSettings())
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, firstShardCount)
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, 0)).get();
|
||||||
|
|
||||||
|
// Since a request with preserve_existing can't change the number of
|
||||||
|
// replicas, we should never get an error here.
|
||||||
|
assertAcked(client().admin().indices()
|
||||||
|
.prepareUpdateSettings("test-index")
|
||||||
|
.setPreserveExisting(true)
|
||||||
|
.setSettings(Settings.builder().put("number_of_replicas", dataNodes))
|
||||||
|
.get());
|
||||||
|
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||||
|
assertEquals(0, clusterState.getMetaData().index("test-index").getNumberOfReplicas());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRestoreSnapshotOverLimit() {
|
||||||
|
Client client = client();
|
||||||
|
|
||||||
|
logger.info("--> creating repository");
|
||||||
|
Settings.Builder repoSettings = Settings.builder();
|
||||||
|
repoSettings.put("location", randomRepoPath());
|
||||||
|
repoSettings.put("compress", randomBoolean());
|
||||||
|
repoSettings.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES);
|
||||||
|
|
||||||
|
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(repoSettings.build()));
|
||||||
|
|
||||||
|
int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();
|
||||||
|
ShardCounts counts = ShardCounts.forDataNodeCount(dataNodes);
|
||||||
|
createIndex("snapshot-index", Settings.builder().put(indexSettings())
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, counts.getFailingIndexShards())
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, counts.getFailingIndexReplicas()).build());
|
||||||
|
ensureGreen();
|
||||||
|
|
||||||
|
logger.info("--> snapshot");
|
||||||
|
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster()
|
||||||
|
.prepareCreateSnapshot("test-repo", "test-snap")
|
||||||
|
.setWaitForCompletion(true)
|
||||||
|
.setIndices("snapshot-index").get();
|
||||||
|
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||||
|
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
|
||||||
|
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||||
|
|
||||||
|
List<SnapshotInfo> snapshotInfos = client.admin().cluster().prepareGetSnapshots("test-repo")
|
||||||
|
.setSnapshots("test-snap").get().getSnapshots();
|
||||||
|
assertThat(snapshotInfos.size(), equalTo(1));
|
||||||
|
SnapshotInfo snapshotInfo = snapshotInfos.get(0);
|
||||||
|
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
|
||||||
|
assertThat(snapshotInfo.version(), equalTo(Version.CURRENT));
|
||||||
|
|
||||||
|
// Test restore after index deletion
|
||||||
|
logger.info("--> delete indices");
|
||||||
|
cluster().wipeIndices("snapshot-index");
|
||||||
|
|
||||||
|
// Reduce the shard limit and fill it up
|
||||||
|
setShardsPerNode(counts.getShardsPerNode());
|
||||||
|
createIndex("test-fill", Settings.builder().put(indexSettings())
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, counts.getFirstIndexShards())
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, counts.getFirstIndexReplicas()).build());
|
||||||
|
|
||||||
|
logger.info("--> restore one index after deletion");
|
||||||
|
try {
|
||||||
|
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster()
|
||||||
|
.prepareRestoreSnapshot("test-repo", "test-snap")
|
||||||
|
.setWaitForCompletion(true).setIndices("snapshot-index").execute().actionGet();
|
||||||
|
fail("Should not have been able to restore snapshot in full cluster");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
verifyException(dataNodes, counts, e);
|
||||||
|
}
|
||||||
|
ensureGreen();
|
||||||
|
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
|
||||||
|
assertFalse(clusterState.getMetaData().hasIndex("snapshot-index"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testOpenIndexOverLimit() {
|
||||||
|
Client client = client();
|
||||||
|
int dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();
|
||||||
|
ShardCounts counts = ShardCounts.forDataNodeCount(dataNodes);
|
||||||
|
|
||||||
|
createIndex("test-index-1", Settings.builder().put(indexSettings())
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, counts.getFailingIndexShards())
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, counts.getFailingIndexReplicas()).build());
|
||||||
|
|
||||||
|
ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
|
||||||
|
assertFalse(healthResponse.isTimedOut());
|
||||||
|
|
||||||
|
AcknowledgedResponse closeIndexResponse = client.admin().indices().prepareClose("test-index-1").execute().actionGet();
|
||||||
|
assertTrue(closeIndexResponse.isAcknowledged());
|
||||||
|
|
||||||
|
// Fill up the cluster
|
||||||
|
setShardsPerNode(counts.getShardsPerNode());
|
||||||
|
createIndex("test-fill", Settings.builder().put(indexSettings())
|
||||||
|
.put(SETTING_NUMBER_OF_SHARDS, counts.getFirstIndexShards())
|
||||||
|
.put(SETTING_NUMBER_OF_REPLICAS, counts.getFirstIndexReplicas()).build());
|
||||||
|
|
||||||
|
|
||||||
|
try {
|
||||||
|
client.admin().indices().prepareOpen("test-index-1").execute().actionGet();
|
||||||
|
fail("should not have been able to open index");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
verifyException(dataNodes, counts, e);
|
||||||
|
}
|
||||||
|
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
|
||||||
|
assertFalse(clusterState.getMetaData().hasIndex("snapshot-index"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private int ensureMultipleDataNodes(int dataNodes) {
|
||||||
|
if (dataNodes == 1) {
|
||||||
|
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).build());
|
||||||
|
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes(">=2").setLocal(true)
|
||||||
|
.execute().actionGet().isTimedOut(), equalTo(false));
|
||||||
|
dataNodes = client().admin().cluster().prepareState().get().getState().getNodes().getDataNodes().size();
|
||||||
|
}
|
||||||
|
return dataNodes;
|
||||||
|
}
|
||||||
|
|
||||||
private void setShardsPerNode(int shardsPerNode) {
|
private void setShardsPerNode(int shardsPerNode) {
|
||||||
try {
|
try {
|
||||||
ClusterUpdateSettingsResponse response;
|
ClusterUpdateSettingsResponse response;
|
||||||
@ -76,6 +323,15 @@ public class ClusterShardLimitIT extends ESIntegTestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyException(int dataNodes, ShardCounts counts, IllegalArgumentException e) {
|
||||||
|
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||||
|
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||||
|
int maxShards = counts.getShardsPerNode() * dataNodes;
|
||||||
|
String expectedError = "Validation Failed: 1: this action would add [" + totalShards
|
||||||
|
+ "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open;";
|
||||||
|
assertEquals(expectedError, e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
public static class ShardCounts {
|
public static class ShardCounts {
|
||||||
private final int shardsPerNode;
|
private final int shardsPerNode;
|
||||||
|
|
||||||
|
@ -36,7 +36,6 @@ import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
|
|||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
@ -585,16 +584,14 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
|||||||
clusterSettings);
|
clusterSettings);
|
||||||
|
|
||||||
int shardsToAdd = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
int shardsToAdd = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||||
DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
Optional<String> errorMessage = IndicesService.checkShardLimit(shardsToAdd, state);
|
||||||
Optional<String> errorMessage = IndicesService.checkShardLimit(shardsToAdd, state, deprecationLogger);
|
|
||||||
|
|
||||||
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||||
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||||
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
||||||
assertWarnings("In a future major version, this request will fail because this action would add [" +
|
assertTrue(errorMessage.isPresent());
|
||||||
totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+
|
assertEquals("this action would add [" + totalShards + "] total shards, but this cluster currently has [" + currentShards
|
||||||
" Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node].");
|
+ "]/[" + maxShards + "] maximum shards open", errorMessage.get());
|
||||||
assertFalse(errorMessage.isPresent());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUnderShardLimit() {
|
public void testUnderShardLimit() {
|
||||||
@ -611,8 +608,7 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
|||||||
|
|
||||||
int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||||
int shardsToAdd = randomIntBetween(1, (counts.getShardsPerNode() * nodesInCluster) - existingShards);
|
int shardsToAdd = randomIntBetween(1, (counts.getShardsPerNode() * nodesInCluster) - existingShards);
|
||||||
DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
|
Optional<String> errorMessage = IndicesService.checkShardLimit(shardsToAdd, state);
|
||||||
Optional<String> errorMessage = IndicesService.checkShardLimit(shardsToAdd, state, deprecationLogger);
|
|
||||||
|
|
||||||
assertFalse(errorMessage.isPresent());
|
assertFalse(errorMessage.isPresent());
|
||||||
}
|
}
|
||||||
|
@ -1,30 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.elasticsearch.xpack.deprecation;
|
|
||||||
|
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
|
||||||
import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
|
|
||||||
|
|
||||||
public class ClusterDeprecationChecks {
|
|
||||||
|
|
||||||
static DeprecationIssue checkShardLimit(ClusterState state) {
|
|
||||||
int shardsPerNode = MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(state.metaData().settings());
|
|
||||||
int nodeCount = state.getNodes().getDataNodes().size();
|
|
||||||
int maxShardsInCluster = shardsPerNode * nodeCount;
|
|
||||||
int currentOpenShards = state.getMetaData().getTotalOpenIndexShards();
|
|
||||||
|
|
||||||
if (currentOpenShards >= maxShardsInCluster) {
|
|
||||||
return new DeprecationIssue(DeprecationIssue.Level.WARNING,
|
|
||||||
"Number of open shards exceeds cluster soft limit",
|
|
||||||
"https://www.elastic.co/guide/en/elasticsearch/reference/master/breaking_70_cluster_changes.html",
|
|
||||||
"There are [" + currentOpenShards + "] open shards in this cluster, but the cluster is limited to [" +
|
|
||||||
shardsPerNode + "] per data node, for [" + maxShardsInCluster + "] maximum.");
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
@ -28,9 +28,7 @@ public class DeprecationChecks {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static List<Function<ClusterState, DeprecationIssue>> CLUSTER_SETTINGS_CHECKS =
|
static List<Function<ClusterState, DeprecationIssue>> CLUSTER_SETTINGS_CHECKS =
|
||||||
Collections.unmodifiableList(Arrays.asList(
|
Collections.emptyList();
|
||||||
ClusterDeprecationChecks::checkShardLimit
|
|
||||||
));
|
|
||||||
|
|
||||||
static List<BiFunction<List<NodeInfo>, List<NodeStats>, DeprecationIssue>> NODE_SETTINGS_CHECKS =
|
static List<BiFunction<List<NodeInfo>, List<NodeStats>, DeprecationIssue>> NODE_SETTINGS_CHECKS =
|
||||||
Collections.unmodifiableList(Arrays.asList(
|
Collections.unmodifiableList(Arrays.asList(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user