From d9422b5e8995d81b79ed4b9791b86e5791cb78a5 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 21 Jan 2016 10:36:43 +0100 Subject: [PATCH] Bring back node throttle type which was lost after index setting refactoring This has caused some test failures lately especially on window (which is likely caused by the rather bad performance of the windows test machines). See one failure here: http://build-us-00.elastic.co/job/es_core_master_window-2008/2934/ This fix has now also a unittest that tests this issue separately. --- .../settings/AbstractScopedSettings.java | 44 +++++++++--------- .../org/elasticsearch/index/IndexModule.java | 2 +- .../elasticsearch/index/IndexSettings.java | 6 +-- .../index/MergePolicyConfig.java | 24 +++++----- .../elasticsearch/index/store/IndexStore.java | 45 ++++++++++++++++--- .../indices/store/IndicesStore.java | 3 +- .../index/store/IndexStoreTests.java | 21 +++++++++ .../SharedClusterSnapshotRestoreIT.java | 2 +- 8 files changed, 101 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index 9bddd73d632..65db6d155d3 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.regex.Pattern; @@ -41,46 +42,45 @@ import java.util.regex.Pattern; */ public abstract class AbstractScopedSettings extends AbstractComponent { private Settings lastSettingsApplied = Settings.EMPTY; - private final List> settingUpdaters = new ArrayList<>(); - private final Map> complexMatchers = new HashMap<>(); - private final Map> keySettings = new HashMap<>(); + private final List> settingUpdaters = new CopyOnWriteArrayList<>(); + private final Map> complexMatchers; + private final Map> keySettings; private final Setting.Scope scope; private static final Pattern KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])*[-\\w]+$"); private static final Pattern GROUP_KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])+$"); - protected AbstractScopedSettings(Settings settings, Set> settingsSet, Setting.Scope scope) { super(settings); this.lastSettingsApplied = Settings.EMPTY; this.scope = scope; - for (Setting entry : settingsSet) { - addSetting(entry); + Map> complexMatchers = new HashMap<>(); + Map> keySettings = new HashMap<>(); + for (Setting setting : settingsSet) { + if (setting.getScope() != scope) { + throw new IllegalArgumentException("Setting must be a " + scope + " setting but was: " + setting.getScope()); + } + if (isValidKey(setting.getKey()) == false && (setting.isGroupSetting() && isValidGroupKey(setting.getKey())) == false) { + throw new IllegalArgumentException("illegal settings key: [" + setting.getKey() + "]"); + } + if (setting.hasComplexMatcher()) { + complexMatchers.putIfAbsent(setting.getKey(), setting); + } else { + keySettings.putIfAbsent(setting.getKey(), setting); + } } + this.complexMatchers = Collections.unmodifiableMap(complexMatchers); + this.keySettings = Collections.unmodifiableMap(keySettings); } protected AbstractScopedSettings(Settings nodeSettings, Settings scopeSettings, AbstractScopedSettings other) { super(nodeSettings); this.lastSettingsApplied = scopeSettings; this.scope = other.scope; - complexMatchers.putAll(other.complexMatchers); - keySettings.putAll(other.keySettings); + complexMatchers = other.complexMatchers; + keySettings = other.keySettings; settingUpdaters.addAll(other.settingUpdaters); } - protected final void addSetting(Setting setting) { - if (setting.getScope() != scope) { - throw new IllegalArgumentException("Setting must be a " + scope + " setting but was: " + setting.getScope()); - } - if (isValidKey(setting.getKey()) == false && (setting.isGroupSetting() && isValidGroupKey(setting.getKey())) == false) { - throw new IllegalArgumentException("illegal settings key: [" + setting.getKey() + "]"); - } - if (setting.hasComplexMatcher()) { - complexMatchers.putIfAbsent(setting.getKey(), setting); - } else { - keySettings.putIfAbsent(setting.getKey(), setting); - } - } - /** * Returns true iff the given key is a valid settings key otherwise false */ diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java index b09d91b1c20..4688fba5034 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java @@ -258,8 +258,8 @@ public final class IndexModule { throw new IllegalStateException("store must not be null"); } } - indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate); indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType); + indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate); final String queryCacheType = indexSettings.getValue(INDEX_QUERY_CACHE_TYPE_SETTING); final BiFunction queryCacheProvider = queryCaches.get(queryCacheType); final QueryCache queryCache = queryCacheProvider.apply(indexSettings, servicesProvider.getIndicesQueryCache()); diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 715bea51694..dc002135966 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -167,10 +167,6 @@ public final class IndexSettings { this(indexMetaData, nodeSettings, (index) -> Regex.simpleMatch(index, indexMetaData.getIndex()), IndexScopedSettings.DEFAULT_SCOPED_SETTINGS); } - IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSettings, IndexScopedSettings indexScopedSettings) { - this(indexMetaData, nodeSettings, (index) -> Regex.simpleMatch(index, indexMetaData.getIndex()), indexScopedSettings); - } - /** * Creates a new {@link IndexSettings} instance. The given node settings will be merged with the settings in the metadata * while index level settings will overwrite node settings. @@ -457,5 +453,5 @@ public final class IndexSettings { } - public IndexScopedSettings getScopedSettings() { return scopedSettings;} + IndexScopedSettings getScopedSettings() { return scopedSettings;} } diff --git a/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java b/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java index 362e9099ee2..bbe8c566de9 100644 --- a/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java +++ b/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java @@ -23,6 +23,7 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.TieredMergePolicy; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -137,16 +138,17 @@ public final class MergePolicyConfig { public static final String INDEX_MERGE_ENABLED = "index.merge.enabled"; // don't convert to Setting<> and register... we only set this in tests and register via a plugin - MergePolicyConfig(ESLogger logger, IndexSettings indexSettings) { + MergePolicyConfig(ESLogger logger, IndexSettings indexSettings) { this.logger = logger; - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_COMPOUND_FORMAT_SETTING, this::setNoCFSRatio); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, this::expungeDeletesAllowed); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, this::floorSegmentSetting); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, this::maxMergesAtOnce); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, this::maxMergesAtOnceExplicit); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, this::maxMergedSegment); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, this::segmentsPerTier); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, this::reclaimDeletesWeight); + IndexScopedSettings scopedSettings = indexSettings.getScopedSettings(); + scopedSettings.addSettingsUpdateConsumer(INDEX_COMPOUND_FORMAT_SETTING, this::setNoCFSRatio); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, this::expungeDeletesAllowed); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, this::floorSegmentSetting); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, this::maxMergesAtOnce); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, this::maxMergesAtOnceExplicit); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, this::maxMergedSegment); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, this::segmentsPerTier); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, this::reclaimDeletesWeight); double forceMergeDeletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING); // percentage ByteSizeValue floorSegment = indexSettings.getValue(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING); int maxMergeAtOnce = indexSettings.getValue(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING); @@ -168,8 +170,10 @@ public final class MergePolicyConfig { mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac()); mergePolicy.setSegmentsPerTier(segmentsPerTier); mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight); - logger.debug("using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]", + if (logger.isTraceEnabled()) { + logger.trace("using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]", forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight); + } } private void reclaimDeletesWeight(Double reclaimDeletesWeight) { diff --git a/core/src/main/java/org/elasticsearch/index/store/IndexStore.java b/core/src/main/java/org/elasticsearch/index/store/IndexStore.java index 29401fdfd51..e98ad7cc6eb 100644 --- a/core/src/main/java/org/elasticsearch/index/store/IndexStore.java +++ b/core/src/main/java/org/elasticsearch/index/store/IndexStore.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.store; import org.apache.lucene.store.StoreRateLimiting; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.IndexSettings; @@ -30,16 +29,17 @@ import org.elasticsearch.index.shard.ShardPath; * */ public class IndexStore extends AbstractIndexComponent { - public static final Setting INDEX_STORE_THROTTLE_TYPE_SETTING = new Setting<>("index.store.throttle.type", "none", StoreRateLimiting.Type::fromString, true, Setting.Scope.INDEX) ; + public static final Setting INDEX_STORE_THROTTLE_TYPE_SETTING = new Setting<>("index.store.throttle.type", "none", IndexRateLimitingType::fromString, true, Setting.Scope.INDEX) ; public static final Setting INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting("index.store.throttle.max_bytes_per_sec", new ByteSizeValue(0), true, Setting.Scope.INDEX); protected final IndexStoreConfig indexStoreConfig; private final StoreRateLimiting rateLimiting = new StoreRateLimiting(); + private volatile IndexRateLimitingType type; public IndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) { super(indexSettings); this.indexStoreConfig = indexStoreConfig; - rateLimiting.setType(indexSettings.getValue(INDEX_STORE_THROTTLE_TYPE_SETTING)); + setType(indexSettings.getValue(INDEX_STORE_THROTTLE_TYPE_SETTING)); rateLimiting.setMaxRate(indexSettings.getValue(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING)); logger.debug("using index.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimiting.getType(), rateLimiting.getRateLimiter()); } @@ -49,7 +49,7 @@ public class IndexStore extends AbstractIndexComponent { * the node level one (defaults to the node level one). */ public StoreRateLimiting rateLimiting() { - return rateLimiting.getType() == StoreRateLimiting.Type.NONE ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting; + return type.useStoreLimiter() ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting; } /** @@ -59,11 +59,44 @@ public class IndexStore extends AbstractIndexComponent { return new FsDirectoryService(indexSettings, this, path); } - public void setType(StoreRateLimiting.Type type) { - rateLimiting.setType(type); + public void setType(IndexRateLimitingType type) { + this.type = type; + if (type.useStoreLimiter() == false) { + rateLimiting.setType(type.type); + } } public void setMaxRate(ByteSizeValue rate) { rateLimiting.setMaxRate(rate); } + + /** + * On an index level we can configure all of {@link org.apache.lucene.store.StoreRateLimiting.Type} as well as + * node which will then use a global rate limiter that has it's own configuration. The global one is + * configured in {@link IndexStoreConfig} which is managed by the per-node {@link org.elasticsearch.indices.IndicesService} + */ + public static final class IndexRateLimitingType { + private final StoreRateLimiting.Type type; + + private IndexRateLimitingType(StoreRateLimiting.Type type) { + this.type = type; + } + + private boolean useStoreLimiter() { + return type == null; + } + + static IndexRateLimitingType fromString(String type) { + if ("node".equalsIgnoreCase(type)) { + return new IndexRateLimitingType(null); + } else { + try { + return new IndexRateLimitingType(StoreRateLimiting.Type.fromString(type)); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException("rate limiting type [" + type + "] not valid, can be one of [all|merge|none|node]"); + } + } + } + } + } diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index dc483932fec..d1075031ee1 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -111,11 +111,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe } for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { - IndexSettings indexSettings = new IndexSettings(event.state().getMetaData().index(indexRoutingTable.index()), settings); // Note, closed indices will not have any routing information, so won't be deleted for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) { ShardId shardId = indexShardRoutingTable.shardId(); + IndexService indexService = indicesService.indexService(indexRoutingTable.getIndex()); + IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() : new IndexSettings(event.state().getMetaData().index(indexRoutingTable.index()), settings); if (indicesService.canDeleteShardContent(shardId, indexSettings)) { deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable); } diff --git a/core/src/test/java/org/elasticsearch/index/store/IndexStoreTests.java b/core/src/test/java/org/elasticsearch/index/store/IndexStoreTests.java index 300e4bb9ab4..214c86a498a 100644 --- a/core/src/test/java/org/elasticsearch/index/store/IndexStoreTests.java +++ b/core/src/test/java/org/elasticsearch/index/store/IndexStoreTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.store.SimpleFSDirectory; +import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.util.Constants; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -95,4 +96,24 @@ public class IndexStoreTests extends ESTestCase { } } } + + public void testUpdateThrottleType() throws IOException { + Settings settings = Settings.settingsBuilder().put(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING.getKey(), "all") + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(new Index("foo"), settings); + IndexStoreConfig indexStoreConfig = new IndexStoreConfig(settings); + IndexStore store = new IndexStore(indexSettings, indexStoreConfig); + assertEquals(StoreRateLimiting.Type.NONE, store.rateLimiting().getType()); + assertEquals(StoreRateLimiting.Type.ALL, indexStoreConfig.getNodeRateLimiter().getType()); + assertNotSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting()); + + store.setType(IndexStore.IndexRateLimitingType.fromString("NODE")); + assertEquals(StoreRateLimiting.Type.ALL, store.rateLimiting().getType()); + assertSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting()); + + store.setType(IndexStore.IndexRateLimitingType.fromString("merge")); + assertEquals(StoreRateLimiting.Type.MERGE, store.rateLimiting().getType()); + assertNotSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting()); + assertEquals(StoreRateLimiting.Type.ALL, indexStoreConfig.getNodeRateLimiter().getType()); + } } diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 4cbf436a743..dac6ac0904d 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1540,7 +1540,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas // Update settings to back to normal assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(Settings.builder() - .put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "none") + .put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "node") )); logger.info("--> wait for snapshot to complete");