Bring back node throttle type which was lost after index setting refactoring
This has caused some test failures lately especially on window (which is likely caused by the rather bad performance of the windows test machines). See one failure here: http://build-us-00.elastic.co/job/es_core_master_window-2008/2934/ This fix has now also a unittest that tests this issue separately.
This commit is contained in:
parent
4a71898e09
commit
d9422b5e89
|
@ -31,6 +31,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -41,46 +42,45 @@ import java.util.regex.Pattern;
|
|||
*/
|
||||
public abstract class AbstractScopedSettings extends AbstractComponent {
|
||||
private Settings lastSettingsApplied = Settings.EMPTY;
|
||||
private final List<SettingUpdater<?>> settingUpdaters = new ArrayList<>();
|
||||
private final Map<String, Setting<?>> complexMatchers = new HashMap<>();
|
||||
private final Map<String, Setting<?>> keySettings = new HashMap<>();
|
||||
private final List<SettingUpdater<?>> settingUpdaters = new CopyOnWriteArrayList<>();
|
||||
private final Map<String, Setting<?>> complexMatchers;
|
||||
private final Map<String, Setting<?>> keySettings;
|
||||
private final Setting.Scope scope;
|
||||
private static final Pattern KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])*[-\\w]+$");
|
||||
private static final Pattern GROUP_KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])+$");
|
||||
|
||||
|
||||
protected AbstractScopedSettings(Settings settings, Set<Setting<?>> settingsSet, Setting.Scope scope) {
|
||||
super(settings);
|
||||
this.lastSettingsApplied = Settings.EMPTY;
|
||||
this.scope = scope;
|
||||
for (Setting<?> entry : settingsSet) {
|
||||
addSetting(entry);
|
||||
Map<String, Setting<?>> complexMatchers = new HashMap<>();
|
||||
Map<String, Setting<?>> keySettings = new HashMap<>();
|
||||
for (Setting<?> setting : settingsSet) {
|
||||
if (setting.getScope() != scope) {
|
||||
throw new IllegalArgumentException("Setting must be a " + scope + " setting but was: " + setting.getScope());
|
||||
}
|
||||
if (isValidKey(setting.getKey()) == false && (setting.isGroupSetting() && isValidGroupKey(setting.getKey())) == false) {
|
||||
throw new IllegalArgumentException("illegal settings key: [" + setting.getKey() + "]");
|
||||
}
|
||||
if (setting.hasComplexMatcher()) {
|
||||
complexMatchers.putIfAbsent(setting.getKey(), setting);
|
||||
} else {
|
||||
keySettings.putIfAbsent(setting.getKey(), setting);
|
||||
}
|
||||
}
|
||||
this.complexMatchers = Collections.unmodifiableMap(complexMatchers);
|
||||
this.keySettings = Collections.unmodifiableMap(keySettings);
|
||||
}
|
||||
|
||||
protected AbstractScopedSettings(Settings nodeSettings, Settings scopeSettings, AbstractScopedSettings other) {
|
||||
super(nodeSettings);
|
||||
this.lastSettingsApplied = scopeSettings;
|
||||
this.scope = other.scope;
|
||||
complexMatchers.putAll(other.complexMatchers);
|
||||
keySettings.putAll(other.keySettings);
|
||||
complexMatchers = other.complexMatchers;
|
||||
keySettings = other.keySettings;
|
||||
settingUpdaters.addAll(other.settingUpdaters);
|
||||
}
|
||||
|
||||
protected final void addSetting(Setting<?> setting) {
|
||||
if (setting.getScope() != scope) {
|
||||
throw new IllegalArgumentException("Setting must be a " + scope + " setting but was: " + setting.getScope());
|
||||
}
|
||||
if (isValidKey(setting.getKey()) == false && (setting.isGroupSetting() && isValidGroupKey(setting.getKey())) == false) {
|
||||
throw new IllegalArgumentException("illegal settings key: [" + setting.getKey() + "]");
|
||||
}
|
||||
if (setting.hasComplexMatcher()) {
|
||||
complexMatchers.putIfAbsent(setting.getKey(), setting);
|
||||
} else {
|
||||
keySettings.putIfAbsent(setting.getKey(), setting);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff the given key is a valid settings key otherwise <code>false</code>
|
||||
*/
|
||||
|
|
|
@ -258,8 +258,8 @@ public final class IndexModule {
|
|||
throw new IllegalStateException("store must not be null");
|
||||
}
|
||||
}
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate);
|
||||
final String queryCacheType = indexSettings.getValue(INDEX_QUERY_CACHE_TYPE_SETTING);
|
||||
final BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = queryCaches.get(queryCacheType);
|
||||
final QueryCache queryCache = queryCacheProvider.apply(indexSettings, servicesProvider.getIndicesQueryCache());
|
||||
|
|
|
@ -167,10 +167,6 @@ public final class IndexSettings {
|
|||
this(indexMetaData, nodeSettings, (index) -> Regex.simpleMatch(index, indexMetaData.getIndex()), IndexScopedSettings.DEFAULT_SCOPED_SETTINGS);
|
||||
}
|
||||
|
||||
IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSettings, IndexScopedSettings indexScopedSettings) {
|
||||
this(indexMetaData, nodeSettings, (index) -> Regex.simpleMatch(index, indexMetaData.getIndex()), indexScopedSettings);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@link IndexSettings} instance. The given node settings will be merged with the settings in the metadata
|
||||
* while index level settings will overwrite node settings.
|
||||
|
@ -457,5 +453,5 @@ public final class IndexSettings {
|
|||
}
|
||||
|
||||
|
||||
public IndexScopedSettings getScopedSettings() { return scopedSettings;}
|
||||
IndexScopedSettings getScopedSettings() { return scopedSettings;}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.lucene.index.MergePolicy;
|
|||
import org.apache.lucene.index.NoMergePolicy;
|
||||
import org.apache.lucene.index.TieredMergePolicy;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -137,16 +138,17 @@ public final class MergePolicyConfig {
|
|||
public static final String INDEX_MERGE_ENABLED = "index.merge.enabled"; // don't convert to Setting<> and register... we only set this in tests and register via a plugin
|
||||
|
||||
|
||||
MergePolicyConfig(ESLogger logger, IndexSettings indexSettings) {
|
||||
MergePolicyConfig(ESLogger logger, IndexSettings indexSettings) {
|
||||
this.logger = logger;
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_COMPOUND_FORMAT_SETTING, this::setNoCFSRatio);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, this::expungeDeletesAllowed);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, this::floorSegmentSetting);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, this::maxMergesAtOnce);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, this::maxMergesAtOnceExplicit);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, this::maxMergedSegment);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, this::segmentsPerTier);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, this::reclaimDeletesWeight);
|
||||
IndexScopedSettings scopedSettings = indexSettings.getScopedSettings();
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_COMPOUND_FORMAT_SETTING, this::setNoCFSRatio);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, this::expungeDeletesAllowed);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, this::floorSegmentSetting);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, this::maxMergesAtOnce);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, this::maxMergesAtOnceExplicit);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, this::maxMergedSegment);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, this::segmentsPerTier);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, this::reclaimDeletesWeight);
|
||||
double forceMergeDeletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING); // percentage
|
||||
ByteSizeValue floorSegment = indexSettings.getValue(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING);
|
||||
int maxMergeAtOnce = indexSettings.getValue(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING);
|
||||
|
@ -168,8 +170,10 @@ public final class MergePolicyConfig {
|
|||
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac());
|
||||
mergePolicy.setSegmentsPerTier(segmentsPerTier);
|
||||
mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight);
|
||||
logger.debug("using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]",
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]",
|
||||
forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight);
|
||||
}
|
||||
}
|
||||
|
||||
private void reclaimDeletesWeight(Double reclaimDeletesWeight) {
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.index.store;
|
|||
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.AbstractIndexComponent;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
|
@ -30,16 +29,17 @@ import org.elasticsearch.index.shard.ShardPath;
|
|||
*
|
||||
*/
|
||||
public class IndexStore extends AbstractIndexComponent {
|
||||
public static final Setting<StoreRateLimiting.Type> INDEX_STORE_THROTTLE_TYPE_SETTING = new Setting<>("index.store.throttle.type", "none", StoreRateLimiting.Type::fromString, true, Setting.Scope.INDEX) ;
|
||||
public static final Setting<IndexRateLimitingType> INDEX_STORE_THROTTLE_TYPE_SETTING = new Setting<>("index.store.throttle.type", "none", IndexRateLimitingType::fromString, true, Setting.Scope.INDEX) ;
|
||||
public static final Setting<ByteSizeValue> INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting("index.store.throttle.max_bytes_per_sec", new ByteSizeValue(0), true, Setting.Scope.INDEX);
|
||||
|
||||
protected final IndexStoreConfig indexStoreConfig;
|
||||
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
|
||||
private volatile IndexRateLimitingType type;
|
||||
|
||||
public IndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) {
|
||||
super(indexSettings);
|
||||
this.indexStoreConfig = indexStoreConfig;
|
||||
rateLimiting.setType(indexSettings.getValue(INDEX_STORE_THROTTLE_TYPE_SETTING));
|
||||
setType(indexSettings.getValue(INDEX_STORE_THROTTLE_TYPE_SETTING));
|
||||
rateLimiting.setMaxRate(indexSettings.getValue(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING));
|
||||
logger.debug("using index.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimiting.getType(), rateLimiting.getRateLimiter());
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ public class IndexStore extends AbstractIndexComponent {
|
|||
* the node level one (defaults to the node level one).
|
||||
*/
|
||||
public StoreRateLimiting rateLimiting() {
|
||||
return rateLimiting.getType() == StoreRateLimiting.Type.NONE ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting;
|
||||
return type.useStoreLimiter() ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -59,11 +59,44 @@ public class IndexStore extends AbstractIndexComponent {
|
|||
return new FsDirectoryService(indexSettings, this, path);
|
||||
}
|
||||
|
||||
public void setType(StoreRateLimiting.Type type) {
|
||||
rateLimiting.setType(type);
|
||||
public void setType(IndexRateLimitingType type) {
|
||||
this.type = type;
|
||||
if (type.useStoreLimiter() == false) {
|
||||
rateLimiting.setType(type.type);
|
||||
}
|
||||
}
|
||||
|
||||
public void setMaxRate(ByteSizeValue rate) {
|
||||
rateLimiting.setMaxRate(rate);
|
||||
}
|
||||
|
||||
/**
|
||||
* On an index level we can configure all of {@link org.apache.lucene.store.StoreRateLimiting.Type} as well as
|
||||
* <tt>node</tt> which will then use a global rate limiter that has it's own configuration. The global one is
|
||||
* configured in {@link IndexStoreConfig} which is managed by the per-node {@link org.elasticsearch.indices.IndicesService}
|
||||
*/
|
||||
public static final class IndexRateLimitingType {
|
||||
private final StoreRateLimiting.Type type;
|
||||
|
||||
private IndexRateLimitingType(StoreRateLimiting.Type type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
private boolean useStoreLimiter() {
|
||||
return type == null;
|
||||
}
|
||||
|
||||
static IndexRateLimitingType fromString(String type) {
|
||||
if ("node".equalsIgnoreCase(type)) {
|
||||
return new IndexRateLimitingType(null);
|
||||
} else {
|
||||
try {
|
||||
return new IndexRateLimitingType(StoreRateLimiting.Type.fromString(type));
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new IllegalArgumentException("rate limiting type [" + type + "] not valid, can be one of [all|merge|none|node]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -111,11 +111,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
}
|
||||
|
||||
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
|
||||
IndexSettings indexSettings = new IndexSettings(event.state().getMetaData().index(indexRoutingTable.index()), settings);
|
||||
// Note, closed indices will not have any routing information, so won't be deleted
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
|
||||
if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) {
|
||||
ShardId shardId = indexShardRoutingTable.shardId();
|
||||
IndexService indexService = indicesService.indexService(indexRoutingTable.getIndex());
|
||||
IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() : new IndexSettings(event.state().getMetaData().index(indexRoutingTable.index()), settings);
|
||||
if (indicesService.canDeleteShardContent(shardId, indexSettings)) {
|
||||
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.lucene.store.MMapDirectory;
|
|||
import org.apache.lucene.store.NIOFSDirectory;
|
||||
import org.apache.lucene.store.NoLockFactory;
|
||||
import org.apache.lucene.store.SimpleFSDirectory;
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
|
@ -95,4 +96,24 @@ public class IndexStoreTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testUpdateThrottleType() throws IOException {
|
||||
Settings settings = Settings.settingsBuilder().put(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING.getKey(), "all")
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(new Index("foo"), settings);
|
||||
IndexStoreConfig indexStoreConfig = new IndexStoreConfig(settings);
|
||||
IndexStore store = new IndexStore(indexSettings, indexStoreConfig);
|
||||
assertEquals(StoreRateLimiting.Type.NONE, store.rateLimiting().getType());
|
||||
assertEquals(StoreRateLimiting.Type.ALL, indexStoreConfig.getNodeRateLimiter().getType());
|
||||
assertNotSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting());
|
||||
|
||||
store.setType(IndexStore.IndexRateLimitingType.fromString("NODE"));
|
||||
assertEquals(StoreRateLimiting.Type.ALL, store.rateLimiting().getType());
|
||||
assertSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting());
|
||||
|
||||
store.setType(IndexStore.IndexRateLimitingType.fromString("merge"));
|
||||
assertEquals(StoreRateLimiting.Type.MERGE, store.rateLimiting().getType());
|
||||
assertNotSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting());
|
||||
assertEquals(StoreRateLimiting.Type.ALL, indexStoreConfig.getNodeRateLimiter().getType());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1540,7 +1540,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
|
||||
// Update settings to back to normal
|
||||
assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "none")
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "node")
|
||||
));
|
||||
|
||||
logger.info("--> wait for snapshot to complete");
|
||||
|
|
Loading…
Reference in New Issue