Merge branch 'bring_back_node_throttle'
This commit is contained in:
commit
eaad8924d8
|
@ -31,6 +31,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -41,46 +42,45 @@ import java.util.regex.Pattern;
|
|||
*/
|
||||
public abstract class AbstractScopedSettings extends AbstractComponent {
|
||||
private Settings lastSettingsApplied = Settings.EMPTY;
|
||||
private final List<SettingUpdater<?>> settingUpdaters = new ArrayList<>();
|
||||
private final Map<String, Setting<?>> complexMatchers = new HashMap<>();
|
||||
private final Map<String, Setting<?>> keySettings = new HashMap<>();
|
||||
private final List<SettingUpdater<?>> settingUpdaters = new CopyOnWriteArrayList<>();
|
||||
private final Map<String, Setting<?>> complexMatchers;
|
||||
private final Map<String, Setting<?>> keySettings;
|
||||
private final Setting.Scope scope;
|
||||
private static final Pattern KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])*[-\\w]+$");
|
||||
private static final Pattern GROUP_KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])+$");
|
||||
|
||||
|
||||
protected AbstractScopedSettings(Settings settings, Set<Setting<?>> settingsSet, Setting.Scope scope) {
|
||||
super(settings);
|
||||
this.lastSettingsApplied = Settings.EMPTY;
|
||||
this.scope = scope;
|
||||
for (Setting<?> entry : settingsSet) {
|
||||
addSetting(entry);
|
||||
Map<String, Setting<?>> complexMatchers = new HashMap<>();
|
||||
Map<String, Setting<?>> keySettings = new HashMap<>();
|
||||
for (Setting<?> setting : settingsSet) {
|
||||
if (setting.getScope() != scope) {
|
||||
throw new IllegalArgumentException("Setting must be a " + scope + " setting but was: " + setting.getScope());
|
||||
}
|
||||
if (isValidKey(setting.getKey()) == false && (setting.isGroupSetting() && isValidGroupKey(setting.getKey())) == false) {
|
||||
throw new IllegalArgumentException("illegal settings key: [" + setting.getKey() + "]");
|
||||
}
|
||||
if (setting.hasComplexMatcher()) {
|
||||
complexMatchers.putIfAbsent(setting.getKey(), setting);
|
||||
} else {
|
||||
keySettings.putIfAbsent(setting.getKey(), setting);
|
||||
}
|
||||
}
|
||||
this.complexMatchers = Collections.unmodifiableMap(complexMatchers);
|
||||
this.keySettings = Collections.unmodifiableMap(keySettings);
|
||||
}
|
||||
|
||||
protected AbstractScopedSettings(Settings nodeSettings, Settings scopeSettings, AbstractScopedSettings other) {
|
||||
super(nodeSettings);
|
||||
this.lastSettingsApplied = scopeSettings;
|
||||
this.scope = other.scope;
|
||||
complexMatchers.putAll(other.complexMatchers);
|
||||
keySettings.putAll(other.keySettings);
|
||||
complexMatchers = other.complexMatchers;
|
||||
keySettings = other.keySettings;
|
||||
settingUpdaters.addAll(other.settingUpdaters);
|
||||
}
|
||||
|
||||
protected final void addSetting(Setting<?> setting) {
|
||||
if (setting.getScope() != scope) {
|
||||
throw new IllegalArgumentException("Setting must be a " + scope + " setting but was: " + setting.getScope());
|
||||
}
|
||||
if (isValidKey(setting.getKey()) == false && (setting.isGroupSetting() && isValidGroupKey(setting.getKey())) == false) {
|
||||
throw new IllegalArgumentException("illegal settings key: [" + setting.getKey() + "]");
|
||||
}
|
||||
if (setting.hasComplexMatcher()) {
|
||||
complexMatchers.putIfAbsent(setting.getKey(), setting);
|
||||
} else {
|
||||
keySettings.putIfAbsent(setting.getKey(), setting);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff the given key is a valid settings key otherwise <code>false</code>
|
||||
*/
|
||||
|
|
|
@ -91,8 +91,8 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
}
|
||||
|
||||
final IndexMetaData indexMetaData = metaData.index(shard.getIndex());
|
||||
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
|
||||
|
||||
// don't go wild here and create a new IndexSetting object for every shard this could cause a lot of garbage
|
||||
// on cluster restart if we allocate a boat load of shards
|
||||
if (shard.allocatedPostIndexCreate(indexMetaData) == false) {
|
||||
// when we create a fresh index
|
||||
continue;
|
||||
|
@ -108,13 +108,13 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
|
||||
final Set<String> lastActiveAllocationIds = indexMetaData.activeAllocationIds(shard.id());
|
||||
final boolean snapshotRestore = shard.restoreSource() != null;
|
||||
final boolean recoverOnAnyNode = recoverOnAnyNode(indexSettings);
|
||||
final boolean recoverOnAnyNode = recoverOnAnyNode(indexMetaData);
|
||||
|
||||
final NodesAndVersions nodesAndVersions;
|
||||
final boolean enoughAllocationsFound;
|
||||
|
||||
if (lastActiveAllocationIds.isEmpty()) {
|
||||
assert indexSettings.getIndexVersionCreated().before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new";
|
||||
assert Version.indexCreated(indexMetaData.getSettings()).before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new";
|
||||
// when we load an old index (after upgrading cluster) or restore a snapshot of an old index
|
||||
// fall back to old version-based allocation mode
|
||||
// Note that once the shard has been active, lastActiveAllocationIds will be non-empty
|
||||
|
@ -356,9 +356,9 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
* Return {@code true} if the index is configured to allow shards to be
|
||||
* recovered on any node
|
||||
*/
|
||||
private boolean recoverOnAnyNode(IndexSettings indexSettings) {
|
||||
return indexSettings.isOnSharedFilesystem()
|
||||
&& IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING.get(indexSettings.getSettings());
|
||||
private boolean recoverOnAnyNode(IndexMetaData metaData) {
|
||||
return (IndexMetaData.isOnSharedFilesystem(metaData.getSettings()) || IndexMetaData.isOnSharedFilesystem(this.settings))
|
||||
&& IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING.get(metaData.getSettings(), this.settings);
|
||||
}
|
||||
|
||||
protected abstract AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
|
||||
|
|
|
@ -258,8 +258,8 @@ public final class IndexModule {
|
|||
throw new IllegalStateException("store must not be null");
|
||||
}
|
||||
}
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate);
|
||||
final String queryCacheType = indexSettings.getValue(INDEX_QUERY_CACHE_TYPE_SETTING);
|
||||
final BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = queryCaches.get(queryCacheType);
|
||||
final QueryCache queryCache = queryCacheProvider.apply(indexSettings, servicesProvider.getIndicesQueryCache());
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.elasticsearch.index.translog.Translog;
|
|||
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
@ -167,10 +168,6 @@ public final class IndexSettings {
|
|||
this(indexMetaData, nodeSettings, (index) -> Regex.simpleMatch(index, indexMetaData.getIndex()), IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
}
|
||||
|
||||
IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSettings, IndexScopedSettings indexScopedSettings) {
|
||||
this(indexMetaData, nodeSettings, (index) -> Regex.simpleMatch(index, indexMetaData.getIndex()), indexScopedSettings);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link IndexSettings} instance. The given node settings will be merged with the settings in the metadata
|
||||
* while index level settings will overwrite node settings.
|
||||
|
@ -200,24 +197,35 @@ public final class IndexSettings {
|
|||
this.defaultAllowUnmappedFields = scopedSettings.get(ALLOW_UNMAPPED);
|
||||
this.indexNameMatcher = indexNameMatcher;
|
||||
this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
|
||||
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
|
||||
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
|
||||
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTTING, this::setTranslogFlushThresholdSize);
|
||||
mergeSchedulerConfig = new MergeSchedulerConfig(this);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
|
||||
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
|
||||
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
|
||||
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow);
|
||||
TTLPurgeDisabled = scopedSettings.get(INDEX_TTL_DISABLE_PURGE_SETTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TTL_DISABLE_PURGE_SETTING, this::setTTLPurgeDisabled);
|
||||
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
|
||||
assert indexNameMatcher.test(indexMetaData.getIndex());
|
||||
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, mergePolicyConfig::setFloorSegmentSetting);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, mergePolicyConfig::setMaxMergesAtOnce);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, mergePolicyConfig::setMaxMergesAtOnceExplicit);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, mergePolicyConfig::setMaxMergedSegment);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, mergePolicyConfig::setSegmentsPerTier);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, mergePolicyConfig::setReclaimDeletesWeight);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING, mergeSchedulerConfig::setMaxThreadCount);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING, mergeSchedulerConfig::setMaxMergeCount);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TTL_DISABLE_PURGE_SETTING, this::setTTLPurgeDisabled);
|
||||
scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTTING, this::setTranslogFlushThresholdSize);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
|
||||
}
|
||||
|
||||
private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
|
||||
|
@ -457,5 +465,5 @@ public final class IndexSettings {
|
|||
}
|
||||
|
||||
|
||||
public IndexScopedSettings getScopedSettings() { return scopedSettings;}
|
||||
IndexScopedSettings getScopedSettings() { return scopedSettings;}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.index.MergePolicy;
|
|||
import org.apache.lucene.index.NoMergePolicy;
|
||||
import org.apache.lucene.index.TieredMergePolicy;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -137,16 +138,9 @@ public final class MergePolicyConfig {
|
|||
public static final String INDEX_MERGE_ENABLED = "index.merge.enabled"; // don't convert to Setting<> and register... we only set this in tests and register via a plugin
|
||||
|
||||
|
||||
MergePolicyConfig(ESLogger logger, IndexSettings indexSettings) {
|
||||
MergePolicyConfig(ESLogger logger, IndexSettings indexSettings) {
|
||||
this.logger = logger;
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_COMPOUND_FORMAT_SETTING, this::setNoCFSRatio);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, this::expungeDeletesAllowed);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, this::floorSegmentSetting);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, this::maxMergesAtOnce);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, this::maxMergesAtOnceExplicit);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, this::maxMergedSegment);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, this::segmentsPerTier);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, this::reclaimDeletesWeight);
|
||||
IndexScopedSettings scopedSettings = indexSettings.getScopedSettings();
|
||||
double forceMergeDeletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING); // percentage
|
||||
ByteSizeValue floorSegment = indexSettings.getValue(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING);
|
||||
int maxMergeAtOnce = indexSettings.getValue(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING);
|
||||
|
@ -168,39 +162,41 @@ public final class MergePolicyConfig {
|
|||
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac());
|
||||
mergePolicy.setSegmentsPerTier(segmentsPerTier);
|
||||
mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight);
|
||||
logger.debug("using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]",
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]",
|
||||
forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight);
|
||||
}
|
||||
}
|
||||
|
||||
private void reclaimDeletesWeight(Double reclaimDeletesWeight) {
|
||||
void setReclaimDeletesWeight(Double reclaimDeletesWeight) {
|
||||
mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight);
|
||||
}
|
||||
|
||||
private void segmentsPerTier(Double segmentsPerTier) {
|
||||
void setSegmentsPerTier(Double segmentsPerTier) {
|
||||
mergePolicy.setSegmentsPerTier(segmentsPerTier);
|
||||
}
|
||||
|
||||
private void maxMergedSegment(ByteSizeValue maxMergedSegment) {
|
||||
void setMaxMergedSegment(ByteSizeValue maxMergedSegment) {
|
||||
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac());
|
||||
}
|
||||
|
||||
private void maxMergesAtOnceExplicit(Integer maxMergeAtOnceExplicit) {
|
||||
void setMaxMergesAtOnceExplicit(Integer maxMergeAtOnceExplicit) {
|
||||
mergePolicy.setMaxMergeAtOnceExplicit(maxMergeAtOnceExplicit);
|
||||
}
|
||||
|
||||
private void maxMergesAtOnce(Integer maxMergeAtOnce) {
|
||||
void setMaxMergesAtOnce(Integer maxMergeAtOnce) {
|
||||
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
|
||||
}
|
||||
|
||||
private void floorSegmentSetting(ByteSizeValue floorSegementSetting) {
|
||||
void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) {
|
||||
mergePolicy.setFloorSegmentMB(floorSegementSetting.mbFrac());
|
||||
}
|
||||
|
||||
private void expungeDeletesAllowed(Double value) {
|
||||
void setExpungeDeletesAllowed(Double value) {
|
||||
mergePolicy.setForceMergeDeletesPctAllowed(value);
|
||||
}
|
||||
|
||||
private void setNoCFSRatio(Double noCFSRatio) {
|
||||
void setNoCFSRatio(Double noCFSRatio) {
|
||||
mergePolicy.setNoCFSRatio(noCFSRatio);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,9 +21,7 @@ package org.elasticsearch.index;
|
|||
|
||||
import org.apache.lucene.index.ConcurrentMergeScheduler;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
|
||||
/**
|
||||
* The merge scheduler (<code>ConcurrentMergeScheduler</code>) controls the execution of
|
||||
|
@ -62,9 +60,6 @@ public final class MergeSchedulerConfig {
|
|||
private volatile int maxMergeCount;
|
||||
|
||||
MergeSchedulerConfig(IndexSettings indexSettings) {
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(MAX_THREAD_COUNT_SETTING, this::setMaxThreadCount);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(MAX_MERGE_COUNT_SETTING, this::setMaxMergeCount);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(AUTO_THROTTLE_SETTING, this::setAutoThrottle);
|
||||
maxThreadCount = indexSettings.getValue(MAX_THREAD_COUNT_SETTING);
|
||||
maxMergeCount = indexSettings.getValue(MAX_MERGE_COUNT_SETTING);
|
||||
this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING);
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.index.store;
|
|||
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.AbstractIndexComponent;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
|
@ -30,16 +29,17 @@ import org.elasticsearch.index.shard.ShardPath;
|
|||
*
|
||||
*/
|
||||
public class IndexStore extends AbstractIndexComponent {
|
||||
public static final Setting<StoreRateLimiting.Type> INDEX_STORE_THROTTLE_TYPE_SETTING = new Setting<>("index.store.throttle.type", "none", StoreRateLimiting.Type::fromString, true, Setting.Scope.INDEX) ;
|
||||
public static final Setting<IndexRateLimitingType> INDEX_STORE_THROTTLE_TYPE_SETTING = new Setting<>("index.store.throttle.type", "none", IndexRateLimitingType::fromString, true, Setting.Scope.INDEX) ;
|
||||
public static final Setting<ByteSizeValue> INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting("index.store.throttle.max_bytes_per_sec", new ByteSizeValue(0), true, Setting.Scope.INDEX);
|
||||
|
||||
protected final IndexStoreConfig indexStoreConfig;
|
||||
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
|
||||
private volatile IndexRateLimitingType type;
|
||||
|
||||
public IndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
|
||||
super(indexSettings);
|
||||
this.indexStoreConfig = indexStoreConfig;
|
||||
rateLimiting.setType(indexSettings.getValue(INDEX_STORE_THROTTLE_TYPE_SETTING));
|
||||
setType(indexSettings.getValue(INDEX_STORE_THROTTLE_TYPE_SETTING));
|
||||
rateLimiting.setMaxRate(indexSettings.getValue(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING));
|
||||
logger.debug("using index.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimiting.getType(), rateLimiting.getRateLimiter());
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ public class IndexStore extends AbstractIndexComponent {
|
|||
* the node level one (defaults to the node level one).
|
||||
*/
|
||||
public StoreRateLimiting rateLimiting() {
|
||||
return rateLimiting.getType() == StoreRateLimiting.Type.NONE ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting;
|
||||
return type.useStoreLimiter() ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -59,11 +59,44 @@ public class IndexStore extends AbstractIndexComponent {
|
|||
return new FsDirectoryService(indexSettings, this, path);
|
||||
}
|
||||
|
||||
public void setType(StoreRateLimiting.Type type) {
|
||||
rateLimiting.setType(type);
|
||||
public void setType(IndexRateLimitingType type) {
|
||||
this.type = type;
|
||||
if (type.useStoreLimiter() == false) {
|
||||
rateLimiting.setType(type.type);
|
||||
}
|
||||
}
|
||||
|
||||
public void setMaxRate(ByteSizeValue rate) {
|
||||
rateLimiting.setMaxRate(rate);
|
||||
}
|
||||
|
||||
/**
|
||||
* On an index level we can configure all of {@link org.apache.lucene.store.StoreRateLimiting.Type} as well as
|
||||
* <tt>node</tt> which will then use a global rate limiter that has it's own configuration. The global one is
|
||||
* configured in {@link IndexStoreConfig} which is managed by the per-node {@link org.elasticsearch.indices.IndicesService}
|
||||
*/
|
||||
public static final class IndexRateLimitingType {
|
||||
private final StoreRateLimiting.Type type;
|
||||
|
||||
private IndexRateLimitingType(StoreRateLimiting.Type type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
private boolean useStoreLimiter() {
|
||||
return type == null;
|
||||
}
|
||||
|
||||
static IndexRateLimitingType fromString(String type) {
|
||||
if ("node".equalsIgnoreCase(type)) {
|
||||
return new IndexRateLimitingType(null);
|
||||
} else {
|
||||
try {
|
||||
return new IndexRateLimitingType(StoreRateLimiting.Type.fromString(type));
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new IllegalArgumentException("rate limiting type [" + type + "] not valid, can be one of [all|merge|none|node]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -111,11 +111,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
}
|
||||
|
||||
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
|
||||
IndexSettings indexSettings = new IndexSettings(event.state().getMetaData().index(indexRoutingTable.index()), settings);
|
||||
// Note, closed indices will not have any routing information, so won't be deleted
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
|
||||
if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) {
|
||||
ShardId shardId = indexShardRoutingTable.shardId();
|
||||
IndexService indexService = indicesService.indexService(indexRoutingTable.getIndex());
|
||||
IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() : new IndexSettings(event.state().getMetaData().index(indexRoutingTable.index()), settings);
|
||||
if (indicesService.canDeleteShardContent(shardId, indexSettings)) {
|
||||
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.lucene.store.MMapDirectory;
|
|||
import org.apache.lucene.store.NIOFSDirectory;
|
||||
import org.apache.lucene.store.NoLockFactory;
|
||||
import org.apache.lucene.store.SimpleFSDirectory;
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -95,4 +96,24 @@ public class IndexStoreTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testUpdateThrottleType() throws IOException {
|
||||
Settings settings = Settings.settingsBuilder().put(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING.getKey(), "all")
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(new Index("foo"), settings);
|
||||
IndexStoreConfig indexStoreConfig = new IndexStoreConfig(settings);
|
||||
IndexStore store = new IndexStore(indexSettings, indexStoreConfig);
|
||||
assertEquals(StoreRateLimiting.Type.NONE, store.rateLimiting().getType());
|
||||
assertEquals(StoreRateLimiting.Type.ALL, indexStoreConfig.getNodeRateLimiter().getType());
|
||||
assertNotSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting());
|
||||
|
||||
store.setType(IndexStore.IndexRateLimitingType.fromString("NODE"));
|
||||
assertEquals(StoreRateLimiting.Type.ALL, store.rateLimiting().getType());
|
||||
assertSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting());
|
||||
|
||||
store.setType(IndexStore.IndexRateLimitingType.fromString("merge"));
|
||||
assertEquals(StoreRateLimiting.Type.MERGE, store.rateLimiting().getType());
|
||||
assertNotSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting());
|
||||
assertEquals(StoreRateLimiting.Type.ALL, indexStoreConfig.getNodeRateLimiter().getType());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1540,7 +1540,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
|
||||
// Update settings to back to normal
|
||||
assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "none")
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "node")
|
||||
));
|
||||
|
||||
logger.info("--> wait for snapshot to complete");
|
||||
|
|
Loading…
Reference in New Issue