Handle `cluster.max_shards_per_node` in YAML config (#57234)
Prior to this commit, `cluster.max_shards_per_node` is not correctly handled when it is set via the YAML config file, only when it is set via the Cluster Settings API. This commit refactors how the limit is implemented, both to enable correctly handling the setting in the YAML and to more effectively centralize the logic used to enforce the limit. The logic used to apply the limit, as well as the setting value, has been moved to the new `ShardLimitValidator`.
This commit is contained in:
parent
9c7a5c7b83
commit
5a4e5a1e9d
|
@ -32,6 +32,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
|
|||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
|
@ -48,7 +49,7 @@ import static org.hamcrest.Matchers.greaterThan;
|
|||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
|
||||
public class ClusterShardLimitIT extends ESIntegTestCase {
|
||||
private static final String shardsPerNodeKey = Metadata.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey();
|
||||
private static final String shardsPerNodeKey = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey();
|
||||
|
||||
public void testSettingClusterMaxShards() {
|
||||
int shardsPerNode = between(1, 500_000);
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.indices.IndexClosedException;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
|
@ -491,14 +492,14 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
|
|||
final Metadata metadata = state.getMetadata();
|
||||
final Metadata brokenMeta = Metadata.builder(metadata).persistentSettings(Settings.builder()
|
||||
.put(metadata.persistentSettings()).put("this.is.unknown", true)
|
||||
.put(Metadata.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), "broken").build()).build();
|
||||
.put(ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), "broken").build()).build();
|
||||
restartNodesOnBrokenClusterState(ClusterState.builder(state).metadata(brokenMeta));
|
||||
|
||||
ensureYellow("test"); // wait for state recovery
|
||||
state = client().admin().cluster().prepareState().get().getState();
|
||||
assertEquals("true", state.metadata().persistentSettings().get("archived.this.is.unknown"));
|
||||
assertEquals("broken", state.metadata().persistentSettings().get("archived."
|
||||
+ Metadata.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()));
|
||||
+ ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()));
|
||||
|
||||
// delete these settings
|
||||
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder().putNull("archived.*")).get();
|
||||
|
@ -506,7 +507,7 @@ public class GatewayIndexStateIT extends ESIntegTestCase {
|
|||
state = client().admin().cluster().prepareState().get().getState();
|
||||
assertNull(state.metadata().persistentSettings().get("archived.this.is.unknown"));
|
||||
assertNull(state.metadata().persistentSettings().get("archived."
|
||||
+ Metadata.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()));
|
||||
+ ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()));
|
||||
assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L);
|
||||
}
|
||||
|
||||
|
|
|
@ -131,9 +131,6 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
|
|||
EnumSet<XContentContext> context();
|
||||
}
|
||||
|
||||
public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE =
|
||||
Setting.intSetting("cluster.max_shards_per_node", 1000, 1, Property.Dynamic, Property.NodeScope);
|
||||
|
||||
public static final Setting<Boolean> SETTING_READ_ONLY_SETTING =
|
||||
Setting.boolSetting("cluster.blocks.read_only", false, Property.Dynamic, Property.NodeScope);
|
||||
|
||||
|
|
|
@ -73,6 +73,7 @@ import org.elasticsearch.index.query.QueryShardContext;
|
|||
import org.elasticsearch.indices.IndexCreationException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
|
@ -89,7 +90,6 @@ import java.util.List;
|
|||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.BiConsumer;
|
||||
|
@ -128,6 +128,7 @@ public class MetadataCreateIndexService {
|
|||
private final ActiveShardsObserver activeShardsObserver;
|
||||
private final NamedXContentRegistry xContentRegistry;
|
||||
private final Collection<SystemIndexDescriptor> systemIndexDescriptors;
|
||||
private final ShardLimitValidator shardLimitValidator;
|
||||
private final boolean forbidPrivateIndexSettings;
|
||||
|
||||
public MetadataCreateIndexService(
|
||||
|
@ -136,6 +137,7 @@ public class MetadataCreateIndexService {
|
|||
final IndicesService indicesService,
|
||||
final AllocationService allocationService,
|
||||
final AliasValidator aliasValidator,
|
||||
final ShardLimitValidator shardLimitValidator,
|
||||
final Environment env,
|
||||
final IndexScopedSettings indexScopedSettings,
|
||||
final ThreadPool threadPool,
|
||||
|
@ -153,6 +155,7 @@ public class MetadataCreateIndexService {
|
|||
this.xContentRegistry = xContentRegistry;
|
||||
this.systemIndexDescriptors = systemIndexDescriptors;
|
||||
this.forbidPrivateIndexSettings = forbidPrivateIndexSettings;
|
||||
this.shardLimitValidator = shardLimitValidator;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -465,7 +468,7 @@ public class MetadataCreateIndexService {
|
|||
|
||||
final Settings aggregatedIndexSettings =
|
||||
aggregateIndexSettings(currentState, request, MetadataIndexTemplateService.resolveSettings(templates), mappings,
|
||||
null, settings, indexScopedSettings);
|
||||
null, settings, indexScopedSettings, shardLimitValidator);
|
||||
int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null);
|
||||
IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards);
|
||||
|
||||
|
@ -499,7 +502,7 @@ public class MetadataCreateIndexService {
|
|||
final Settings aggregatedIndexSettings =
|
||||
aggregateIndexSettings(currentState, request,
|
||||
MetadataIndexTemplateService.resolveSettings(currentState.metadata(), templateName),
|
||||
mappings, null, settings, indexScopedSettings);
|
||||
mappings, null, settings, indexScopedSettings, shardLimitValidator);
|
||||
int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null);
|
||||
IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards);
|
||||
|
||||
|
@ -539,8 +542,8 @@ public class MetadataCreateIndexService {
|
|||
Collections.unmodifiableMap(MapperService.parseMapping(xContentRegistry, sourceMappings)));
|
||||
}
|
||||
|
||||
final Settings aggregatedIndexSettings =
|
||||
aggregateIndexSettings(currentState, request, Settings.EMPTY, mappings, sourceMetadata, settings, indexScopedSettings);
|
||||
final Settings aggregatedIndexSettings = aggregateIndexSettings(currentState, request, Settings.EMPTY, mappings, sourceMetadata,
|
||||
settings, indexScopedSettings, shardLimitValidator);
|
||||
final int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, sourceMetadata);
|
||||
IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards);
|
||||
|
||||
|
@ -710,7 +713,7 @@ public class MetadataCreateIndexService {
|
|||
static Settings aggregateIndexSettings(ClusterState currentState, CreateIndexClusterStateUpdateRequest request,
|
||||
Settings templateSettings, Map<String, Map<String, Object>> mappings,
|
||||
@Nullable IndexMetadata sourceMetadata, Settings settings,
|
||||
IndexScopedSettings indexScopedSettings) {
|
||||
IndexScopedSettings indexScopedSettings, ShardLimitValidator shardLimitValidator) {
|
||||
Settings.Builder indexSettingsBuilder = Settings.builder();
|
||||
if (sourceMetadata == null) {
|
||||
indexSettingsBuilder.put(templateSettings);
|
||||
|
@ -759,14 +762,14 @@ public class MetadataCreateIndexService {
|
|||
|
||||
Settings indexSettings = indexSettingsBuilder.build();
|
||||
/*
|
||||
* We can not check the shard limit until we have applied templates, otherwise we do not know the actual number of shards
|
||||
* We can not validate settings until we have applied templates, otherwise we do not know the actual settings
|
||||
* that will be used to create this index.
|
||||
*/
|
||||
MetadataCreateIndexService.checkShardLimit(indexSettings, currentState);
|
||||
shardLimitValidator.validateShardLimit(indexSettings, currentState);
|
||||
if (indexSettings.getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) == false) {
|
||||
DEPRECATION_LOGGER.deprecatedAndMaybeLog("soft_deletes_disabled",
|
||||
"Creating indices with soft-deletes disabled is deprecated and will be removed in future Elasticsearch versions. " +
|
||||
"Please do not specify value for setting [index.soft_deletes.enabled] of index [" + request.index() + "].");
|
||||
"Please do not specify value for setting [index.soft_deletes.enabled] of index [" + request.index() + "].");
|
||||
}
|
||||
validateTranslogRetentionSettings(indexSettings);
|
||||
return indexSettings;
|
||||
|
@ -1009,26 +1012,6 @@ public class MetadataCreateIndexService {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether an index can be created without going over the cluster shard limit.
|
||||
*
|
||||
* @param settings the settings of the index to be created
|
||||
* @param clusterState the current cluster state
|
||||
* @throws ValidationException if creating this index would put the cluster over the cluster shard limit
|
||||
*/
|
||||
public static void checkShardLimit(final Settings settings, final ClusterState clusterState) {
|
||||
final int numberOfShards = INDEX_NUMBER_OF_SHARDS_SETTING.get(settings);
|
||||
final int numberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
|
||||
final int shardsToCreate = numberOfShards * (1 + numberOfReplicas);
|
||||
|
||||
final Optional<String> shardLimit = IndicesService.checkShardLimit(shardsToCreate, clusterState);
|
||||
if (shardLimit.isPresent()) {
|
||||
final ValidationException e = new ValidationException();
|
||||
e.addValidationError(shardLimit.get());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
List<String> getIndexSettingsValidationErrors(final Settings settings, final boolean forbidPrivateIndexSettings) {
|
||||
List<String> validationErrors = validateIndexCustomPath(settings, env.sharedDataFile());
|
||||
if (forbidPrivateIndexSettings) {
|
||||
|
|
|
@ -52,7 +52,6 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.ValidationException;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -66,6 +65,7 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.snapshots.RestoreService;
|
||||
import org.elasticsearch.snapshots.SnapshotInProgressException;
|
||||
|
@ -82,7 +82,6 @@ import java.util.HashMap;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -106,6 +105,7 @@ public class MetadataIndexStateService {
|
|||
private final AllocationService allocationService;
|
||||
private final MetadataIndexUpgradeService metadataIndexUpgradeService;
|
||||
private final IndicesService indicesService;
|
||||
private final ShardLimitValidator shardLimitValidator;
|
||||
private final ThreadPool threadPool;
|
||||
private final TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction;
|
||||
private final ActiveShardsObserver activeShardsObserver;
|
||||
|
@ -113,7 +113,7 @@ public class MetadataIndexStateService {
|
|||
@Inject
|
||||
public MetadataIndexStateService(ClusterService clusterService, AllocationService allocationService,
|
||||
MetadataIndexUpgradeService metadataIndexUpgradeService,
|
||||
IndicesService indicesService, ThreadPool threadPool,
|
||||
IndicesService indicesService, ShardLimitValidator shardLimitValidator, ThreadPool threadPool,
|
||||
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction) {
|
||||
this.indicesService = indicesService;
|
||||
this.clusterService = clusterService;
|
||||
|
@ -121,6 +121,7 @@ public class MetadataIndexStateService {
|
|||
this.threadPool = threadPool;
|
||||
this.transportVerifyShardBeforeCloseAction = transportVerifyShardBeforeCloseAction;
|
||||
this.metadataIndexUpgradeService = metadataIndexUpgradeService;
|
||||
this.shardLimitValidator = shardLimitValidator;
|
||||
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
|
||||
}
|
||||
|
||||
|
@ -553,7 +554,7 @@ public class MetadataIndexStateService {
|
|||
}
|
||||
}
|
||||
|
||||
validateShardLimit(currentState, indices);
|
||||
shardLimitValidator.validateShardLimit(currentState, indices);
|
||||
if (indicesToOpen.isEmpty()) {
|
||||
return currentState;
|
||||
}
|
||||
|
@ -603,33 +604,6 @@ public class MetadataIndexStateService {
|
|||
return ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates whether a list of indices can be opened without going over the cluster shard limit. Only counts indices which are
|
||||
* currently closed and will be opened, ignores indices which are already open.
|
||||
*
|
||||
* @param currentState The current cluster state.
|
||||
* @param indices The indices which are to be opened.
|
||||
* @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled.
|
||||
*/
|
||||
static void validateShardLimit(ClusterState currentState, Index[] indices) {
|
||||
int shardsToOpen = Arrays.stream(indices)
|
||||
.filter(index -> currentState.metadata().index(index).getState().equals(IndexMetadata.State.CLOSE))
|
||||
.mapToInt(index -> getTotalShardCount(currentState, index))
|
||||
.sum();
|
||||
|
||||
Optional<String> error = IndicesService.checkShardLimit(shardsToOpen, currentState);
|
||||
if (error.isPresent()) {
|
||||
ValidationException ex = new ValidationException();
|
||||
ex.addValidationError(error.get());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
private static int getTotalShardCount(ClusterState state, Index index) {
|
||||
IndexMetadata indexMetadata = state.metadata().index(index);
|
||||
return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Generates a {@link ClusterBlock} that blocks read and write operations on soon-to-be-closed indices. The
|
||||
* cluster block is generated with the id value equals to {@link #INDEX_CLOSED_BLOCK_ID} and a unique UUID.
|
||||
|
|
|
@ -19,8 +19,8 @@
|
|||
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -45,6 +45,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -70,16 +71,19 @@ public class MetadataUpdateSettingsService {
|
|||
|
||||
private final IndexScopedSettings indexScopedSettings;
|
||||
private final IndicesService indicesService;
|
||||
private final ShardLimitValidator shardLimitValidator;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
@Inject
|
||||
public MetadataUpdateSettingsService(ClusterService clusterService, AllocationService allocationService,
|
||||
IndexScopedSettings indexScopedSettings, IndicesService indicesService, ThreadPool threadPool) {
|
||||
IndexScopedSettings indexScopedSettings, IndicesService indicesService,
|
||||
ShardLimitValidator shardLimitValidator, ThreadPool threadPool) {
|
||||
this.clusterService = clusterService;
|
||||
this.threadPool = threadPool;
|
||||
this.allocationService = allocationService;
|
||||
this.indexScopedSettings = indexScopedSettings;
|
||||
this.indicesService = indicesService;
|
||||
this.shardLimitValidator = shardLimitValidator;
|
||||
}
|
||||
|
||||
public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request,
|
||||
|
@ -154,7 +158,7 @@ public class MetadataUpdateSettingsService {
|
|||
int totalNewShards = Arrays.stream(request.indices())
|
||||
.mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas))
|
||||
.sum();
|
||||
Optional<String> error = IndicesService.checkShardLimit(totalNewShards, currentState);
|
||||
Optional<String> error = shardLimitValidator.checkShardLimit(totalNewShards, currentState);
|
||||
if (error.isPresent()) {
|
||||
ValidationException ex = new ValidationException();
|
||||
ex.addValidationError(error.get());
|
||||
|
|
|
@ -89,6 +89,7 @@ import org.elasticsearch.indices.IndexingMemoryController;
|
|||
import org.elasticsearch.indices.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.IndicesRequestCache;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.indices.analysis.HunspellService;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
||||
|
@ -224,7 +225,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING,
|
||||
Metadata.SETTING_READ_ONLY_SETTING,
|
||||
Metadata.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
|
||||
Metadata.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
|
||||
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
|
||||
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
|
||||
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
|
||||
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
@ -82,8 +81,8 @@ import org.elasticsearch.core.internal.io.IOUtils;
|
|||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
import org.elasticsearch.env.ShardLockObtainFailedException;
|
||||
import org.elasticsearch.gateway.MetadataStateFormat;
|
||||
import org.elasticsearch.gateway.MetaStateService;
|
||||
import org.elasticsearch.gateway.MetadataStateFormat;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
|
@ -1547,36 +1546,6 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
this.idFieldDataEnabled = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks to see if an operation can be performed without taking the cluster over the cluster-wide shard limit. Adds a deprecation
|
||||
* warning or returns an error message as appropriate
|
||||
*
|
||||
* @param newShards The number of shards to be added by this operation
|
||||
* @param state The current cluster state
|
||||
* @return If present, an error message to be given as the reason for failing
|
||||
* an operation. If empty, a sign that the operation is valid.
|
||||
*/
|
||||
public static Optional<String> checkShardLimit(int newShards, ClusterState state) {
|
||||
Settings theseSettings = state.metadata().settings();
|
||||
int nodeCount = state.getNodes().getDataNodes().size();
|
||||
|
||||
// Only enforce the shard limit if we have at least one data node, so that we don't block
|
||||
// index creation during cluster setup
|
||||
if (nodeCount == 0 || newShards < 0) {
|
||||
return Optional.empty();
|
||||
}
|
||||
int maxShardsPerNode = Metadata.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(theseSettings);
|
||||
int maxShardsInCluster = maxShardsPerNode * nodeCount;
|
||||
int currentOpenShards = state.getMetadata().getTotalOpenIndexShards();
|
||||
|
||||
if ((currentOpenShards + newShards) > maxShardsInCluster) {
|
||||
String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" +
|
||||
currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open";
|
||||
return Optional.of(errorMessage);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
private void updateDanglingIndicesInfo(Index index) {
|
||||
assert DiscoveryNode.isDataNode(settings) : "dangling indices information should only be persisted on data nodes";
|
||||
assert nodeWriteDanglingIndicesInfo : "writing dangling indices info is not enabled";
|
||||
|
|
|
@ -0,0 +1,146 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices;
|
||||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.ValidationException;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
|
||||
|
||||
/**
|
||||
* This class contains the logic used to check the cluster-wide shard limit before shards are created and ensuring that the limit is
|
||||
* updated correctly on setting updates, etc.
|
||||
*
|
||||
* NOTE: This is the limit applied at *shard creation time*. If you are looking for the limit applied at *allocation* time, which is
|
||||
* controlled by a different setting,
|
||||
* see {@link org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider}.
|
||||
*/
|
||||
public class ShardLimitValidator {
|
||||
public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE =
|
||||
Setting.intSetting("cluster.max_shards_per_node", 1000, 1, Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||
protected final AtomicInteger shardLimitPerNode = new AtomicInteger();
|
||||
|
||||
public ShardLimitValidator(final Settings settings, ClusterService clusterService) {
|
||||
this.shardLimitPerNode.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(settings));
|
||||
clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE, this::setShardLimitPerNode);
|
||||
}
|
||||
|
||||
private void setShardLimitPerNode(int newValue) {
|
||||
this.shardLimitPerNode.set(newValue);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_SHARDS_PER_NODE} setting.
|
||||
* @return the current value of the setting
|
||||
*/
|
||||
public int getShardLimitPerNode() {
|
||||
return shardLimitPerNode.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether an index can be created without going over the cluster shard limit.
|
||||
*
|
||||
* @param settings the settings of the index to be created
|
||||
* @param state the current cluster state
|
||||
* @throws ValidationException if creating this index would put the cluster over the cluster shard limit
|
||||
*/
|
||||
public void validateShardLimit(final Settings settings, final ClusterState state) {
|
||||
final int numberOfShards = INDEX_NUMBER_OF_SHARDS_SETTING.get(settings);
|
||||
final int numberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
|
||||
final int shardsToCreate = numberOfShards * (1 + numberOfReplicas);
|
||||
|
||||
final Optional<String> shardLimit = checkShardLimit(shardsToCreate, state);
|
||||
if (shardLimit.isPresent()) {
|
||||
final ValidationException e = new ValidationException();
|
||||
e.addValidationError(shardLimit.get());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates whether a list of indices can be opened without going over the cluster shard limit. Only counts indices which are
|
||||
* currently closed and will be opened, ignores indices which are already open.
|
||||
*
|
||||
* @param currentState The current cluster state.
|
||||
* @param indicesToOpen The indices which are to be opened.
|
||||
* @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled.
|
||||
*/
|
||||
public void validateShardLimit(ClusterState currentState, Index[] indicesToOpen) {
|
||||
int shardsToOpen = Arrays.stream(indicesToOpen)
|
||||
.filter(index -> currentState.metadata().index(index).getState().equals(IndexMetadata.State.CLOSE))
|
||||
.mapToInt(index -> getTotalShardCount(currentState, index))
|
||||
.sum();
|
||||
|
||||
Optional<String> error = checkShardLimit(shardsToOpen, currentState);
|
||||
if (error.isPresent()) {
|
||||
ValidationException ex = new ValidationException();
|
||||
ex.addValidationError(error.get());
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
private static int getTotalShardCount(ClusterState state, Index index) {
|
||||
IndexMetadata indexMetadata = state.metadata().index(index);
|
||||
return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks to see if an operation can be performed without taking the cluster over the cluster-wide shard limit.
|
||||
* Returns an error message if appropriate, or an empty {@link Optional} otherwise.
|
||||
*
|
||||
* @param newShards The number of shards to be added by this operation
|
||||
* @param state The current cluster state
|
||||
* @return If present, an error message to be given as the reason for failing
|
||||
* an operation. If empty, a sign that the operation is valid.
|
||||
*/
|
||||
public Optional<String> checkShardLimit(int newShards, ClusterState state) {
|
||||
return checkShardLimit(newShards, state, getShardLimitPerNode());
|
||||
}
|
||||
|
||||
// package-private for testing
|
||||
static Optional<String> checkShardLimit(int newShards, ClusterState state, int maxShardsPerNodeSetting) {
|
||||
int nodeCount = state.getNodes().getDataNodes().size();
|
||||
|
||||
// Only enforce the shard limit if we have at least one data node, so that we don't block
|
||||
// index creation during cluster setup
|
||||
if (nodeCount == 0 || newShards < 0) {
|
||||
return Optional.empty();
|
||||
}
|
||||
int maxShardsPerNode = maxShardsPerNodeSetting;
|
||||
int maxShardsInCluster = maxShardsPerNode * nodeCount;
|
||||
int currentOpenShards = state.getMetadata().getTotalOpenIndexShards();
|
||||
|
||||
if ((currentOpenShards + newShards) > maxShardsInCluster) {
|
||||
String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" +
|
||||
currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open";
|
||||
return Optional.of(errorMessage);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
|
@ -108,6 +108,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry;
|
|||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.indices.analysis.AnalysisModule;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
|
@ -480,18 +481,21 @@ public class Node implements Closeable {
|
|||
|
||||
final AliasValidator aliasValidator = new AliasValidator();
|
||||
|
||||
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
|
||||
final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(
|
||||
settings,
|
||||
clusterService,
|
||||
indicesService,
|
||||
clusterModule.getAllocationService(),
|
||||
aliasValidator,
|
||||
shardLimitValidator,
|
||||
environment,
|
||||
settingsModule.getIndexScopedSettings(),
|
||||
threadPool,
|
||||
xContentRegistry,
|
||||
systemIndexDescriptors,
|
||||
forbidPrivateIndexSettings);
|
||||
forbidPrivateIndexSettings
|
||||
);
|
||||
|
||||
final MetadataCreateDataStreamService metadataCreateDataStreamService =
|
||||
new MetadataCreateDataStreamService(threadPool, clusterService, metadataCreateIndexService);
|
||||
|
@ -547,7 +551,7 @@ public class Node implements Closeable {
|
|||
TransportNodesSnapshotsStatus nodesSnapshotsStatus = new TransportNodesSnapshotsStatus(threadPool, clusterService,
|
||||
transportService, snapshotShardsService, actionModule.getActionFilters());
|
||||
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
|
||||
metadataCreateIndexService, metadataIndexUpgradeService, clusterService.getClusterSettings());
|
||||
metadataCreateIndexService, metadataIndexUpgradeService, clusterService.getClusterSettings(), shardLimitValidator);
|
||||
|
||||
final RerouteService rerouteService
|
||||
= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);
|
||||
|
@ -639,6 +643,7 @@ public class Node implements Closeable {
|
|||
b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus);
|
||||
b.bind(RestoreService.class).toInstance(restoreService);
|
||||
b.bind(RerouteService.class).toInstance(rerouteService);
|
||||
b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator);
|
||||
}
|
||||
);
|
||||
injector = modules.createInjector();
|
||||
|
|
|
@ -68,6 +68,7 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
|
@ -88,10 +89,10 @@ import static java.util.Collections.emptySet;
|
|||
import static java.util.Collections.unmodifiableSet;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_HISTORY_UUID;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_HISTORY_UUID;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_UPGRADED;
|
||||
import static org.elasticsearch.common.util.set.Sets.newHashSet;
|
||||
|
@ -151,13 +152,16 @@ public class RestoreService implements ClusterStateApplier {
|
|||
|
||||
private final MetadataIndexUpgradeService metadataIndexUpgradeService;
|
||||
|
||||
private final ShardLimitValidator shardLimitValidator;
|
||||
|
||||
private final ClusterSettings clusterSettings;
|
||||
|
||||
private final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor;
|
||||
|
||||
public RestoreService(ClusterService clusterService, RepositoriesService repositoriesService,
|
||||
AllocationService allocationService, MetadataCreateIndexService createIndexService,
|
||||
MetadataIndexUpgradeService metadataIndexUpgradeService, ClusterSettings clusterSettings) {
|
||||
MetadataIndexUpgradeService metadataIndexUpgradeService, ClusterSettings clusterSettings,
|
||||
ShardLimitValidator shardLimitValidator) {
|
||||
this.clusterService = clusterService;
|
||||
this.repositoriesService = repositoriesService;
|
||||
this.allocationService = allocationService;
|
||||
|
@ -166,6 +170,7 @@ public class RestoreService implements ClusterStateApplier {
|
|||
clusterService.addStateApplier(this);
|
||||
this.clusterSettings = clusterSettings;
|
||||
this.cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor();
|
||||
this.shardLimitValidator = shardLimitValidator;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -290,7 +295,7 @@ public class RestoreService implements ClusterStateApplier {
|
|||
indexMdBuilder.settings(Settings.builder()
|
||||
.put(snapshotIndexMetadata.getSettings())
|
||||
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID()));
|
||||
MetadataCreateIndexService.checkShardLimit(snapshotIndexMetadata.getSettings(), currentState);
|
||||
shardLimitValidator.validateShardLimit(snapshotIndexMetadata.getSettings(), currentState);
|
||||
if (!request.includeAliases() && !snapshotIndexMetadata.getAliases().isEmpty()) {
|
||||
// Remove all aliases - they shouldn't be restored
|
||||
indexMdBuilder.removeAllAliases();
|
||||
|
|
|
@ -31,13 +31,13 @@ import org.elasticsearch.cluster.metadata.AliasAction;
|
|||
import org.elasticsearch.cluster.metadata.AliasMetadata;
|
||||
import org.elasticsearch.cluster.metadata.AliasValidator;
|
||||
import org.elasticsearch.cluster.metadata.ComponentTemplate;
|
||||
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.DataStreamTests;
|
||||
import org.elasticsearch.cluster.metadata.IndexAbstraction;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
|
||||
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
|
||||
import org.elasticsearch.cluster.metadata.MetadataIndexAliasesService;
|
||||
|
@ -56,6 +56,7 @@ import org.elasticsearch.index.IndexService;
|
|||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.test.ClusterServiceUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
|
@ -470,8 +471,10 @@ public class MetadataRolloverServiceTests extends ESTestCase {
|
|||
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
|
||||
when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg());
|
||||
|
||||
ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService);
|
||||
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(Settings.EMPTY,
|
||||
clusterService, indicesService, allocationService, null, env, null, testThreadPool, null, Collections.emptyList(), false);
|
||||
clusterService, indicesService, allocationService, null, shardLimitValidator, env, null,
|
||||
testThreadPool, null, Collections.emptyList(), false);
|
||||
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService,
|
||||
new AliasValidator(), null, xContentRegistry());
|
||||
MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService,
|
||||
|
@ -538,8 +541,10 @@ public class MetadataRolloverServiceTests extends ESTestCase {
|
|||
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
|
||||
when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg());
|
||||
|
||||
ShardLimitValidator shardLimitValidator = new ShardLimitValidator(Settings.EMPTY, clusterService);
|
||||
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(Settings.EMPTY,
|
||||
clusterService, indicesService, allocationService, null, env, null, testThreadPool, null, Collections.emptyList(), false);
|
||||
clusterService, indicesService, allocationService, null, shardLimitValidator, env, null,
|
||||
testThreadPool, null, Collections.emptyList(), false);
|
||||
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService,
|
||||
new AliasValidator(), null, xContentRegistry());
|
||||
MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService,
|
||||
|
@ -597,7 +602,8 @@ public class MetadataRolloverServiceTests extends ESTestCase {
|
|||
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
|
||||
|
||||
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(Settings.EMPTY,
|
||||
clusterService, indicesService, allocationService, null, env, null, testThreadPool, null, Collections.emptyList(), false);
|
||||
clusterService, indicesService, allocationService, null, null, env,
|
||||
null, testThreadPool, null, Collections.emptyList(), false);
|
||||
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService,
|
||||
new AliasValidator(), null, xContentRegistry());
|
||||
MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService,
|
||||
|
|
|
@ -39,9 +39,8 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
|||
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
|
||||
import org.elasticsearch.cluster.shards.ShardCounts;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.ValidationException;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
|
@ -58,6 +57,7 @@ import org.elasticsearch.index.mapper.MapperService;
|
|||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.indices.InvalidAliasNameException;
|
||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.test.ClusterServiceUtils;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -77,7 +77,6 @@ import java.util.Comparator;
|
|||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
@ -106,14 +105,12 @@ import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.clus
|
|||
import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards;
|
||||
import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings;
|
||||
import static org.elasticsearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases;
|
||||
import static org.elasticsearch.cluster.shards.ShardCounts.forDataNodeCount;
|
||||
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
|
||||
import static org.elasticsearch.indices.IndicesServiceTests.createClusterForShardLimitTest;
|
||||
import static org.elasticsearch.indices.ShardLimitValidatorTests.createTestShardLimitService;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.hasToString;
|
||||
import static org.hamcrest.Matchers.hasValue;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -130,7 +127,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
public void setupCreateIndexRequestAndAliasValidator() {
|
||||
aliasValidator = new AliasValidator();
|
||||
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
|
||||
Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
Settings indexSettings = Settings.builder().put(SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build();
|
||||
queryShardContext = new QueryShardContext(0,
|
||||
new IndexSettings(IndexMetadata.builder("test").settings(indexSettings).build(), indexSettings),
|
||||
|
@ -483,17 +480,16 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testValidateIndexName() throws Exception {
|
||||
ThreadPool testThreadPool = new TestThreadPool(getTestName());
|
||||
try {
|
||||
withTemporaryClusterService(((clusterService, threadPool) -> {
|
||||
MetadataCreateIndexService checkerService = new MetadataCreateIndexService(
|
||||
Settings.EMPTY,
|
||||
ClusterServiceUtils.createClusterService(testThreadPool),
|
||||
clusterService,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
createTestShardLimitService(randomIntBetween(1, 1000), clusterService), null,
|
||||
null,
|
||||
null,
|
||||
testThreadPool,
|
||||
threadPool,
|
||||
null,
|
||||
Collections.emptyList(),
|
||||
false
|
||||
|
@ -511,9 +507,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
validateIndexName(checkerService, "..", "must not be '.' or '..'");
|
||||
|
||||
validateIndexName(checkerService, "foo:bar", "must not contain ':'");
|
||||
} finally {
|
||||
testThreadPool.shutdown();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private void validateIndexName(MetadataCreateIndexService metadataCreateIndexService, String indexName, String errorMessage) {
|
||||
|
@ -558,36 +552,6 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testShardLimit() {
|
||||
int nodesInCluster = randomIntBetween(2,90);
|
||||
ShardCounts counts = forDataNodeCount(nodesInCluster);
|
||||
Settings clusterSettings = Settings.builder()
|
||||
.put(Metadata.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode())
|
||||
.build();
|
||||
ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(),
|
||||
clusterSettings);
|
||||
|
||||
Settings indexSettings = Settings.builder()
|
||||
.put(SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(SETTING_NUMBER_OF_SHARDS, counts.getFailingIndexShards())
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, counts.getFailingIndexReplicas())
|
||||
.build();
|
||||
|
||||
final ValidationException e = expectThrows(
|
||||
ValidationException.class,
|
||||
() -> MetadataCreateIndexService.checkShardLimit(indexSettings, state));
|
||||
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
||||
final String expectedMessage = String.format(
|
||||
Locale.ROOT,
|
||||
"this action would add [%d] total shards, but this cluster currently has [%d]/[%d] maximum shards open",
|
||||
totalShards,
|
||||
currentShards,
|
||||
maxShards);
|
||||
assertThat(e, hasToString(containsString(expectedMessage)));
|
||||
}
|
||||
|
||||
public void testValidateDotIndex() {
|
||||
List<SystemIndexDescriptor> systemIndexDescriptors = new ArrayList<>();
|
||||
systemIndexDescriptors.add(new SystemIndexDescriptor(".test", "test"));
|
||||
|
@ -595,17 +559,16 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
systemIndexDescriptors.add(new SystemIndexDescriptor(".pattern-test*", "test-1"));
|
||||
systemIndexDescriptors.add(new SystemIndexDescriptor(".pattern-test-overlapping", "test-2"));
|
||||
|
||||
ThreadPool testThreadPool = new TestThreadPool(getTestName());
|
||||
try {
|
||||
withTemporaryClusterService(((clusterService, threadPool) -> {
|
||||
MetadataCreateIndexService checkerService = new MetadataCreateIndexService(
|
||||
Settings.EMPTY,
|
||||
ClusterServiceUtils.createClusterService(testThreadPool),
|
||||
clusterService,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
createTestShardLimitService(randomIntBetween(1, 1000), clusterService), null,
|
||||
null,
|
||||
null,
|
||||
testThreadPool,
|
||||
threadPool,
|
||||
null,
|
||||
systemIndexDescriptors,
|
||||
false
|
||||
|
@ -634,10 +597,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
containsString("index name [.pattern-test-overlapping] is claimed as a system index by multiple system index patterns:"));
|
||||
assertThat(exception.getMessage(), containsString("pattern: [.pattern-test*], description: [test-1]"));
|
||||
assertThat(exception.getMessage(), containsString("pattern: [.pattern-test-overlapping], description: [test-2]"));
|
||||
|
||||
} finally {
|
||||
testThreadPool.shutdown();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
public void testParseMappingsAppliesDataFromTemplateAndRequest() throws Exception {
|
||||
|
@ -674,7 +634,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
request.settings(Settings.builder().put("request_setting", "value2").build());
|
||||
|
||||
Settings aggregatedIndexSettings = aggregateIndexSettings(clusterState, request, templateMetadata.settings(), emptyMap(),
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService());
|
||||
|
||||
assertThat(aggregatedIndexSettings.get("template_setting"), equalTo("value1"));
|
||||
assertThat(aggregatedIndexSettings.get("request_setting"), equalTo("value2"));
|
||||
|
@ -711,7 +671,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
MetadataIndexTemplateService.resolveAliases(Collections.singletonList(templateMetadata)),
|
||||
Metadata.builder().build(), aliasValidator, xContentRegistry(), queryShardContext);
|
||||
Settings aggregatedIndexSettings = aggregateIndexSettings(ClusterState.EMPTY_STATE, request, templateMetadata.settings(),
|
||||
emptyMap(), null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
emptyMap(), null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService());
|
||||
|
||||
assertThat(resolvedAliases.get(0).getSearchRouting(), equalTo("fromRequest"));
|
||||
assertThat(aggregatedIndexSettings.get("key1"), equalTo("requestValue"));
|
||||
|
@ -727,14 +687,15 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
|
||||
public void testDefaultSettings() {
|
||||
Settings aggregatedIndexSettings = aggregateIndexSettings(ClusterState.EMPTY_STATE, request, Settings.EMPTY, emptyMap(),
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService());
|
||||
|
||||
assertThat(aggregatedIndexSettings.get(SETTING_NUMBER_OF_SHARDS), equalTo("1"));
|
||||
}
|
||||
|
||||
public void testSettingsFromClusterState() {
|
||||
Settings aggregatedIndexSettings = aggregateIndexSettings(ClusterState.EMPTY_STATE, request, Settings.EMPTY, emptyMap(),
|
||||
null, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 15).build(), IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
null, Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 15).build(), IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
|
||||
randomShardLimitService());
|
||||
|
||||
assertThat(aggregatedIndexSettings.get(SETTING_NUMBER_OF_SHARDS), equalTo("15"));
|
||||
}
|
||||
|
@ -758,7 +719,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
));
|
||||
Settings aggregatedIndexSettings = aggregateIndexSettings(ClusterState.EMPTY_STATE, request,
|
||||
MetadataIndexTemplateService.resolveSettings(templates), emptyMap(),
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService());
|
||||
List<AliasMetadata> resolvedAliases = resolveAndValidateAliases(request.index(), request.aliases(),
|
||||
MetadataIndexTemplateService.resolveAliases(templates),
|
||||
Metadata.builder().build(), aliasValidator, xContentRegistry(), queryShardContext);
|
||||
|
@ -785,7 +746,8 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
Settings.builder().put("index.blocks.write", true).build());
|
||||
|
||||
Settings aggregatedIndexSettings = aggregateIndexSettings(clusterState, request, templateMetadata.settings(), emptyMap(),
|
||||
clusterState.metadata().index("sourceIndex"), Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
clusterState.metadata().index("sourceIndex"), Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
|
||||
randomShardLimitService());
|
||||
|
||||
assertThat(aggregatedIndexSettings.get("templateSetting"), is(nullValue()));
|
||||
assertThat(aggregatedIndexSettings.get("requestSetting"), is("requestValue"));
|
||||
|
@ -975,7 +937,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
|
||||
request.settings(Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), false).build());
|
||||
aggregateIndexSettings(ClusterState.EMPTY_STATE, request, Settings.EMPTY, Collections.emptyMap(),
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService());
|
||||
assertWarnings("Creating indices with soft-deletes disabled is deprecated and will be removed in future Elasticsearch versions. "
|
||||
+ "Please do not specify value for setting [index.soft_deletes.enabled] of index [test].");
|
||||
request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test");
|
||||
|
@ -983,7 +945,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
request.settings(Settings.builder().put(INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
|
||||
}
|
||||
aggregateIndexSettings(ClusterState.EMPTY_STATE, request, Settings.EMPTY, Collections.emptyMap(),
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService());
|
||||
}
|
||||
|
||||
public void testValidateTranslogRetentionSettings() {
|
||||
|
@ -996,7 +958,7 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
}
|
||||
request.settings(settings.build());
|
||||
aggregateIndexSettings(ClusterState.EMPTY_STATE, request, Settings.EMPTY, Collections.emptyMap(),
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
null, Settings.EMPTY, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, randomShardLimitService());
|
||||
assertWarnings("Translog retention settings [index.translog.retention.age] "
|
||||
+ "and [index.translog.retention.size] are deprecated and effectively ignored. They will be removed in a future version.");
|
||||
}
|
||||
|
@ -1167,4 +1129,17 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
|||
}
|
||||
return converted;
|
||||
}
|
||||
private ShardLimitValidator randomShardLimitService() {
|
||||
return createTestShardLimitService(randomIntBetween(10,10000));
|
||||
}
|
||||
|
||||
private void withTemporaryClusterService(BiConsumer<ClusterService, ThreadPool> consumer) {
|
||||
ThreadPool threadPool = new TestThreadPool(getTestName());
|
||||
try {
|
||||
final ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
|
||||
consumer.accept(clusterService, threadPool);
|
||||
} finally {
|
||||
threadPool.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,9 +37,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
|
|||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.shards.ShardCounts;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ValidationException;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
@ -53,7 +51,6 @@ import org.elasticsearch.snapshots.SnapshotInfoTests;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -72,15 +69,12 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_C
|
|||
import static org.elasticsearch.cluster.metadata.MetadataIndexStateService.INDEX_CLOSED_BLOCK;
|
||||
import static org.elasticsearch.cluster.metadata.MetadataIndexStateService.INDEX_CLOSED_BLOCK_ID;
|
||||
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
|
||||
import static org.elasticsearch.cluster.shards.ShardCounts.forDataNodeCount;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class MetadataIndexStateServiceTests extends ESTestCase {
|
||||
|
||||
|
@ -333,29 +327,6 @@ public class MetadataIndexStateServiceTests extends ESTestCase {
|
|||
assertEquals(blockedIndices.get(test), blockedIndices2.get(test));
|
||||
}
|
||||
|
||||
public void testValidateShardLimit() {
|
||||
int nodesInCluster = randomIntBetween(2, 90);
|
||||
ShardCounts counts = forDataNodeCount(nodesInCluster);
|
||||
Settings clusterSettings = Settings.builder()
|
||||
.put(Metadata.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode())
|
||||
.build();
|
||||
ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(),
|
||||
counts.getFailingIndexShards(), counts.getFailingIndexReplicas(), clusterSettings);
|
||||
|
||||
Index[] indices = Arrays.stream(state.metadata().indices().values().toArray(IndexMetadata.class))
|
||||
.map(IndexMetadata::getIndex)
|
||||
.collect(Collectors.toList())
|
||||
.toArray(new Index[2]);
|
||||
|
||||
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
||||
ValidationException exception = expectThrows(ValidationException.class,
|
||||
() -> MetadataIndexStateService.validateShardLimit(state, indices));
|
||||
assertEquals("Validation Failed: 1: this action would add [" + totalShards + "] total shards, but this cluster currently has [" +
|
||||
currentShards + "]/[" + maxShards + "] maximum shards open;", exception.getMessage());
|
||||
}
|
||||
|
||||
public void testIsIndexVerifiedBeforeClosed() {
|
||||
final ClusterState initialState = ClusterState.builder(new ClusterName("testIsIndexMetadataClosed")).build();
|
||||
{
|
||||
|
@ -409,33 +380,11 @@ public class MetadataIndexStateServiceTests extends ESTestCase {
|
|||
assertThat(failedIndices, equalTo(disappearedIndices));
|
||||
}
|
||||
|
||||
public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas,
|
||||
int closedIndexShards, int closedIndexReplicas, Settings clusterSettings) {
|
||||
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodes = ImmutableOpenMap.builder();
|
||||
for (int i = 0; i < nodesInCluster; i++) {
|
||||
dataNodes.put(randomAlphaOfLengthBetween(5, 15), mock(DiscoveryNode.class));
|
||||
}
|
||||
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
|
||||
when(nodes.getDataNodes()).thenReturn(dataNodes.build());
|
||||
|
||||
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build();
|
||||
state = addOpenedIndex(randomAlphaOfLengthBetween(5, 15), openIndexShards, openIndexReplicas, state);
|
||||
state = addClosedIndex(randomAlphaOfLengthBetween(5, 15), closedIndexShards, closedIndexReplicas, state);
|
||||
|
||||
final Metadata.Builder metadata = Metadata.builder(state.metadata());
|
||||
if (randomBoolean()) {
|
||||
metadata.persistentSettings(clusterSettings);
|
||||
} else {
|
||||
metadata.transientSettings(clusterSettings);
|
||||
}
|
||||
return ClusterState.builder(state).metadata(metadata).nodes(nodes).build();
|
||||
}
|
||||
|
||||
private static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
|
||||
public static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
|
||||
return addIndex(state, index, numShards, numReplicas, IndexMetadata.State.OPEN, null);
|
||||
}
|
||||
|
||||
private static ClusterState addClosedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
|
||||
public static ClusterState addClosedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
|
||||
return addIndex(state, index, numShards, numReplicas, IndexMetadata.State.CLOSE, INDEX_CLOSED_BLOCK);
|
||||
}
|
||||
|
||||
|
|
|
@ -60,6 +60,7 @@ import java.util.stream.Collectors;
|
|||
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.elasticsearch.common.settings.Settings.builder;
|
||||
import static org.elasticsearch.indices.ShardLimitValidatorTests.createTestShardLimitService;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.containsStringIgnoringCase;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
|
@ -870,12 +871,14 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
createTestShardLimitService(randomIntBetween(1, 1000)),
|
||||
new Environment(builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(), null),
|
||||
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
|
||||
null,
|
||||
xContentRegistry,
|
||||
Collections.emptyList(),
|
||||
true);
|
||||
true
|
||||
);
|
||||
MetadataIndexTemplateService service = new MetadataIndexTemplateService(null, createIndexService,
|
||||
new AliasValidator(), null,
|
||||
new IndexScopedSettings(Settings.EMPTY, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS), xContentRegistry);
|
||||
|
@ -925,12 +928,14 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
|
|||
indicesService,
|
||||
null,
|
||||
null,
|
||||
createTestShardLimitService(randomIntBetween(1, 1000)),
|
||||
new Environment(builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(), null),
|
||||
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS,
|
||||
null,
|
||||
xContentRegistry(),
|
||||
Collections.emptyList(),
|
||||
true);
|
||||
true
|
||||
);
|
||||
return new MetadataIndexTemplateService(
|
||||
clusterService, createIndexService, new AliasValidator(), indicesService,
|
||||
new IndexScopedSettings(Settings.EMPTY, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS), xContentRegistry());
|
||||
|
|
|
@ -30,12 +30,8 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.metadata.IndexGraveyard;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.cluster.shards.ShardCounts;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -85,7 +81,6 @@ import java.util.stream.Collectors;
|
|||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
|
||||
import static org.elasticsearch.cluster.shards.ShardCounts.forDataNodeCount;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -580,117 +575,4 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
|||
".*multiple engine factories provided for \\[foobar/.*\\]: \\[.*FooEngineFactory\\],\\[.*BarEngineFactory\\].*";
|
||||
assertThat(e, hasToString(new RegexMatcher(pattern)));
|
||||
}
|
||||
|
||||
public void testOverShardLimit() {
|
||||
int nodesInCluster = randomIntBetween(1,90);
|
||||
ShardCounts counts = forDataNodeCount(nodesInCluster);
|
||||
|
||||
Settings clusterSettings = Settings.builder()
|
||||
.put(Metadata.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode())
|
||||
.build();
|
||||
|
||||
ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(),
|
||||
clusterSettings);
|
||||
|
||||
int shardsToAdd = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||
Optional<String> errorMessage = IndicesService.checkShardLimit(shardsToAdd, state);
|
||||
|
||||
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
||||
assertTrue(errorMessage.isPresent());
|
||||
assertEquals("this action would add [" + totalShards + "] total shards, but this cluster currently has [" + currentShards
|
||||
+ "]/[" + maxShards + "] maximum shards open", errorMessage.get());
|
||||
}
|
||||
|
||||
public void testUnderShardLimit() {
|
||||
int nodesInCluster = randomIntBetween(2,90);
|
||||
// Calculate the counts for a cluster 1 node smaller than we have to ensure we have headroom
|
||||
ShardCounts counts = forDataNodeCount(nodesInCluster - 1);
|
||||
|
||||
Settings clusterSettings = Settings.builder()
|
||||
.put(Metadata.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), counts.getShardsPerNode())
|
||||
.build();
|
||||
|
||||
ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(),
|
||||
clusterSettings);
|
||||
|
||||
int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||
int shardsToAdd = randomIntBetween(1, (counts.getShardsPerNode() * nodesInCluster) - existingShards);
|
||||
Optional<String> errorMessage = IndicesService.checkShardLimit(shardsToAdd, state);
|
||||
|
||||
assertFalse(errorMessage.isPresent());
|
||||
}
|
||||
|
||||
public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int shardsInIndex, int replicas,
|
||||
Settings clusterSettings) {
|
||||
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodes = ImmutableOpenMap.builder();
|
||||
for (int i = 0; i < nodesInCluster; i++) {
|
||||
dataNodes.put(randomAlphaOfLengthBetween(5,15), mock(DiscoveryNode.class));
|
||||
}
|
||||
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
|
||||
when(nodes.getDataNodes()).thenReturn(dataNodes.build());
|
||||
|
||||
IndexMetadata.Builder indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 15))
|
||||
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
|
||||
.creationDate(randomLong())
|
||||
.numberOfShards(shardsInIndex)
|
||||
.numberOfReplicas(replicas);
|
||||
Metadata.Builder metadata = Metadata.builder().put(indexMetadata);
|
||||
if (randomBoolean()) {
|
||||
metadata.transientSettings(clusterSettings);
|
||||
} else {
|
||||
metadata.persistentSettings(clusterSettings);
|
||||
}
|
||||
|
||||
return ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metadata(metadata)
|
||||
.nodes(nodes)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testOptimizeAutoGeneratedIdsSettingRemoval() throws Exception {
|
||||
final IndicesService indicesService = getIndicesService();
|
||||
|
||||
final Index index = new Index("foo-index", UUIDs.randomBase64UUID());
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_7_0_0)
|
||||
.put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID());
|
||||
IndexMetadata indexMetadata = new IndexMetadata.Builder(index.getName())
|
||||
.settings(builder.build())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
IndexService indexService = indicesService.createIndex(indexMetadata, Collections.emptyList(), false);
|
||||
assertNotNull(indexService);
|
||||
|
||||
final Index index2 = new Index("bar-index", UUIDs.randomBase64UUID());
|
||||
Settings.Builder builder2 = Settings.builder()
|
||||
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.V_7_0_0)
|
||||
.put(IndexMetadata.SETTING_INDEX_UUID, index2.getUUID())
|
||||
.put(EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS.getKey(), randomBoolean());
|
||||
IndexMetadata indexMetadata2 = new IndexMetadata.Builder(index2.getName())
|
||||
.settings(builder2.build())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
|
||||
() -> indicesService.createIndex(indexMetadata2, Collections.emptyList(), false));
|
||||
assertEquals("Setting [" + EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS.getKey() + "] was removed in version 7.0.0",
|
||||
ex.getMessage());
|
||||
|
||||
Version version = randomFrom(Version.V_6_0_0_rc1, Version.V_6_0_0, Version.V_6_2_0, Version.V_6_3_0, Version.V_6_4_0);
|
||||
builder = Settings.builder()
|
||||
.put(IndexMetadata.SETTING_VERSION_CREATED, version)
|
||||
.put(IndexMetadata.SETTING_INDEX_UUID, index2.getUUID())
|
||||
.put(EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS.getKey(), randomBoolean());
|
||||
IndexMetadata indexMetadata3 = new IndexMetadata.Builder(index2.getName())
|
||||
.settings(builder.build())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
IndexService indexService2 = indicesService.createIndex(indexMetadata3, Collections.emptyList(), false);
|
||||
assertNotNull(indexService2);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,185 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.cluster.shards.ShardCounts;
|
||||
import org.elasticsearch.common.ValidationException;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.MetadataIndexStateServiceTests.addClosedIndex;
|
||||
import static org.elasticsearch.cluster.metadata.MetadataIndexStateServiceTests.addOpenedIndex;
|
||||
import static org.elasticsearch.cluster.shards.ShardCounts.forDataNodeCount;
|
||||
import static org.elasticsearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class ShardLimitValidatorTests extends ESTestCase {
|
||||
|
||||
public void testOverShardLimit() {
|
||||
int nodesInCluster = randomIntBetween(1, 90);
|
||||
ShardCounts counts = forDataNodeCount(nodesInCluster);
|
||||
|
||||
Settings clusterSettings = Settings.builder().build();
|
||||
|
||||
ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas()
|
||||
);
|
||||
|
||||
int shardsToAdd = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||
Optional<String> errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode());
|
||||
|
||||
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
||||
assertTrue(errorMessage.isPresent());
|
||||
assertEquals("this action would add [" + totalShards + "] total shards, but this cluster currently has [" + currentShards
|
||||
+ "]/[" + maxShards + "] maximum shards open", errorMessage.get());
|
||||
}
|
||||
|
||||
public void testUnderShardLimit() {
|
||||
int nodesInCluster = randomIntBetween(2, 90);
|
||||
// Calculate the counts for a cluster 1 node smaller than we have to ensure we have headroom
|
||||
ShardCounts counts = forDataNodeCount(nodesInCluster - 1);
|
||||
|
||||
Settings clusterSettings = Settings.builder().build();
|
||||
|
||||
ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas()
|
||||
);
|
||||
|
||||
int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||
int shardsToAdd = randomIntBetween(1, (counts.getShardsPerNode() * nodesInCluster) - existingShards);
|
||||
Optional<String> errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode());
|
||||
|
||||
assertFalse(errorMessage.isPresent());
|
||||
}
|
||||
|
||||
public void testValidateShardLimit() {
|
||||
int nodesInCluster = randomIntBetween(2, 90);
|
||||
ShardCounts counts = forDataNodeCount(nodesInCluster);
|
||||
ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas(),
|
||||
counts.getFailingIndexShards(), counts.getFailingIndexReplicas());
|
||||
|
||||
Index[] indices = Arrays.stream(state.metadata().indices().values().toArray(IndexMetadata.class))
|
||||
.map(IndexMetadata::getIndex)
|
||||
.collect(Collectors.toList())
|
||||
.toArray(new Index[2]);
|
||||
|
||||
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
|
||||
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
|
||||
int maxShards = counts.getShardsPerNode() * nodesInCluster;
|
||||
ShardLimitValidator shardLimitValidator = createTestShardLimitService(counts.getShardsPerNode());
|
||||
ValidationException exception = expectThrows(ValidationException.class,
|
||||
() -> shardLimitValidator.validateShardLimit(state, indices));
|
||||
assertEquals("Validation Failed: 1: this action would add [" + totalShards + "] total shards, but this cluster currently has [" +
|
||||
currentShards + "]/[" + maxShards + "] maximum shards open;", exception.getMessage());
|
||||
}
|
||||
|
||||
public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int shardsInIndex, int replicas) {
|
||||
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodes = ImmutableOpenMap.builder();
|
||||
for (int i = 0; i < nodesInCluster; i++) {
|
||||
dataNodes.put(randomAlphaOfLengthBetween(5, 15), mock(DiscoveryNode.class));
|
||||
}
|
||||
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
|
||||
when(nodes.getDataNodes()).thenReturn(dataNodes.build());
|
||||
|
||||
IndexMetadata.Builder indexMetadata = IndexMetadata.builder(randomAlphaOfLengthBetween(5, 15))
|
||||
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT))
|
||||
.creationDate(randomLong())
|
||||
.numberOfShards(shardsInIndex)
|
||||
.numberOfReplicas(replicas);
|
||||
Metadata.Builder metadata = Metadata.builder().put(indexMetadata);
|
||||
if (randomBoolean()) {
|
||||
metadata.transientSettings(Settings.EMPTY);
|
||||
} else {
|
||||
metadata.persistentSettings(Settings.EMPTY);
|
||||
}
|
||||
|
||||
return ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metadata(metadata)
|
||||
.nodes(nodes)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas,
|
||||
int closedIndexShards, int closedIndexReplicas) {
|
||||
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodes = ImmutableOpenMap.builder();
|
||||
for (int i = 0; i < nodesInCluster; i++) {
|
||||
dataNodes.put(randomAlphaOfLengthBetween(5, 15), mock(DiscoveryNode.class));
|
||||
}
|
||||
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
|
||||
when(nodes.getDataNodes()).thenReturn(dataNodes.build());
|
||||
|
||||
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build();
|
||||
state = addOpenedIndex(randomAlphaOfLengthBetween(5, 15), openIndexShards, openIndexReplicas, state);
|
||||
state = addClosedIndex(randomAlphaOfLengthBetween(5, 15), closedIndexShards, closedIndexReplicas, state);
|
||||
|
||||
final Metadata.Builder metadata = Metadata.builder(state.metadata());
|
||||
if (randomBoolean()) {
|
||||
metadata.persistentSettings(Settings.EMPTY);
|
||||
} else {
|
||||
metadata.transientSettings(Settings.EMPTY);
|
||||
}
|
||||
return ClusterState.builder(state).metadata(metadata).nodes(nodes).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link ShardLimitValidator} for testing with the given setting and a mocked cluster service.
|
||||
*
|
||||
* @param maxShardsPerNode the value to use for the max shards per node setting
|
||||
* @return a test instance
|
||||
*/
|
||||
public static ShardLimitValidator createTestShardLimitService(int maxShardsPerNode) {
|
||||
// Use a mocked clusterService - for unit tests we won't be updating the setting anyway.
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
Settings limitOnlySettings = Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), maxShardsPerNode).build();
|
||||
when(clusterService.getClusterSettings())
|
||||
.thenReturn(new ClusterSettings(limitOnlySettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
|
||||
return new ShardLimitValidator(limitOnlySettings, clusterService);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link ShardLimitValidator} for testing with the given setting and a given cluster service.
|
||||
*
|
||||
* @param maxShardsPerNode the value to use for the max shards per node setting
|
||||
* @param clusterService the cluster service to use
|
||||
* @return a test instance
|
||||
*/
|
||||
public static ShardLimitValidator createTestShardLimitService(int maxShardsPerNode, ClusterService clusterService) {
|
||||
Settings limitOnlySettings = Settings.builder().put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), maxShardsPerNode).build();
|
||||
|
||||
return new ShardLimitValidator(limitOnlySettings, clusterService);
|
||||
}
|
||||
}
|
|
@ -88,6 +88,7 @@ import org.elasticsearch.index.IndexService;
|
|||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.test.gateway.TestGatewayAllocator;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
|
@ -197,13 +198,14 @@ public class ClusterStateChanges {
|
|||
|
||||
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction = new TransportVerifyShardBeforeCloseAction(SETTINGS,
|
||||
transportService, clusterService, indicesService, threadPool, null, actionFilters);
|
||||
ShardLimitValidator shardLimitValidator = new ShardLimitValidator(SETTINGS, clusterService);
|
||||
MetadataIndexStateService indexStateService = new MetadataIndexStateService(clusterService, allocationService,
|
||||
metadataIndexUpgradeService, indicesService, threadPool, transportVerifyShardBeforeCloseAction);
|
||||
metadataIndexUpgradeService, indicesService, shardLimitValidator, threadPool, transportVerifyShardBeforeCloseAction);
|
||||
MetadataDeleteIndexService deleteIndexService = new MetadataDeleteIndexService(SETTINGS, clusterService, allocationService);
|
||||
MetadataUpdateSettingsService metadataUpdateSettingsService = new MetadataUpdateSettingsService(clusterService,
|
||||
allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, threadPool);
|
||||
allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, shardLimitValidator, threadPool);
|
||||
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(SETTINGS, clusterService, indicesService,
|
||||
allocationService, new AliasValidator(), environment,
|
||||
allocationService, new AliasValidator(), shardLimitValidator, environment,
|
||||
IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, threadPool, xContentRegistry, Collections.emptyList(), true);
|
||||
|
||||
transportCloseIndexAction = new TransportCloseIndexAction(SETTINGS, transportService, clusterService, threadPool,
|
||||
|
|
|
@ -155,6 +155,7 @@ import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
|
|||
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.ShardLimitValidator;
|
||||
import org.elasticsearch.indices.analysis.AnalysisModule;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
|
@ -1497,9 +1498,10 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
actionFilters),
|
||||
RetentionLeaseSyncer.EMPTY);
|
||||
Map<ActionType, TransportAction> actions = new HashMap<>();
|
||||
final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService);
|
||||
final MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService(settings, clusterService,
|
||||
indicesService,
|
||||
allocationService, new AliasValidator(), environment, indexScopedSettings,
|
||||
allocationService, new AliasValidator(), shardLimitValidator, environment, indexScopedSettings,
|
||||
threadPool, namedXContentRegistry, Collections.emptyList(), false);
|
||||
actions.put(CreateIndexAction.INSTANCE,
|
||||
new TransportCreateIndexAction(
|
||||
|
@ -1528,7 +1530,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
settings, namedXContentRegistry,
|
||||
mapperRegistry,
|
||||
indexScopedSettings),
|
||||
clusterSettings
|
||||
clusterSettings,
|
||||
shardLimitValidator
|
||||
);
|
||||
actions.put(PutMappingAction.INSTANCE,
|
||||
new TransportPutMappingAction(transportService, clusterService, threadPool, metadataMappingService,
|
||||
|
|
Loading…
Reference in New Issue