From d9422b5e8995d81b79ed4b9791b86e5791cb78a5 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 21 Jan 2016 10:36:43 +0100 Subject: [PATCH 1/2] Bring back node throttle type which was lost after index setting refactoring This has caused some test failures lately especially on window (which is likely caused by the rather bad performance of the windows test machines). See one failure here: http://build-us-00.elastic.co/job/es_core_master_window-2008/2934/ This fix has now also a unittest that tests this issue separately. --- .../settings/AbstractScopedSettings.java | 44 +++++++++--------- .../org/elasticsearch/index/IndexModule.java | 2 +- .../elasticsearch/index/IndexSettings.java | 6 +-- .../index/MergePolicyConfig.java | 24 +++++----- .../elasticsearch/index/store/IndexStore.java | 45 ++++++++++++++++--- .../indices/store/IndicesStore.java | 3 +- .../index/store/IndexStoreTests.java | 21 +++++++++ .../SharedClusterSnapshotRestoreIT.java | 2 +- 8 files changed, 101 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java index 9bddd73d632..65db6d155d3 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/AbstractScopedSettings.java @@ -31,6 +31,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.regex.Pattern; @@ -41,46 +42,45 @@ import java.util.regex.Pattern; */ public abstract class AbstractScopedSettings extends AbstractComponent { private Settings lastSettingsApplied = Settings.EMPTY; - private final List> settingUpdaters = new ArrayList<>(); - private final Map> complexMatchers = new HashMap<>(); - private final Map> keySettings = new HashMap<>(); + private final List> settingUpdaters = new CopyOnWriteArrayList<>(); + private final Map> complexMatchers; + private final Map> keySettings; private final Setting.Scope scope; private static final Pattern KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])*[-\\w]+$"); private static final Pattern GROUP_KEY_PATTERN = Pattern.compile("^(?:[-\\w]+[.])+$"); - protected AbstractScopedSettings(Settings settings, Set> settingsSet, Setting.Scope scope) { super(settings); this.lastSettingsApplied = Settings.EMPTY; this.scope = scope; - for (Setting entry : settingsSet) { - addSetting(entry); + Map> complexMatchers = new HashMap<>(); + Map> keySettings = new HashMap<>(); + for (Setting setting : settingsSet) { + if (setting.getScope() != scope) { + throw new IllegalArgumentException("Setting must be a " + scope + " setting but was: " + setting.getScope()); + } + if (isValidKey(setting.getKey()) == false && (setting.isGroupSetting() && isValidGroupKey(setting.getKey())) == false) { + throw new IllegalArgumentException("illegal settings key: [" + setting.getKey() + "]"); + } + if (setting.hasComplexMatcher()) { + complexMatchers.putIfAbsent(setting.getKey(), setting); + } else { + keySettings.putIfAbsent(setting.getKey(), setting); + } } + this.complexMatchers = Collections.unmodifiableMap(complexMatchers); + this.keySettings = Collections.unmodifiableMap(keySettings); } protected AbstractScopedSettings(Settings nodeSettings, Settings scopeSettings, AbstractScopedSettings other) { super(nodeSettings); this.lastSettingsApplied = scopeSettings; this.scope = other.scope; - complexMatchers.putAll(other.complexMatchers); - keySettings.putAll(other.keySettings); + complexMatchers = other.complexMatchers; + keySettings = other.keySettings; settingUpdaters.addAll(other.settingUpdaters); } - protected final void addSetting(Setting setting) { - if (setting.getScope() != scope) { - throw new IllegalArgumentException("Setting must be a " + scope + " setting but was: " + setting.getScope()); - } - if (isValidKey(setting.getKey()) == false && (setting.isGroupSetting() && isValidGroupKey(setting.getKey())) == false) { - throw new IllegalArgumentException("illegal settings key: [" + setting.getKey() + "]"); - } - if (setting.hasComplexMatcher()) { - complexMatchers.putIfAbsent(setting.getKey(), setting); - } else { - keySettings.putIfAbsent(setting.getKey(), setting); - } - } - /** * Returns true iff the given key is a valid settings key otherwise false */ diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java index b09d91b1c20..4688fba5034 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java @@ -258,8 +258,8 @@ public final class IndexModule { throw new IllegalStateException("store must not be null"); } } - indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate); indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType); + indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate); final String queryCacheType = indexSettings.getValue(INDEX_QUERY_CACHE_TYPE_SETTING); final BiFunction queryCacheProvider = queryCaches.get(queryCacheType); final QueryCache queryCache = queryCacheProvider.apply(indexSettings, servicesProvider.getIndicesQueryCache()); diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index 715bea51694..dc002135966 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -167,10 +167,6 @@ public final class IndexSettings { this(indexMetaData, nodeSettings, (index) -> Regex.simpleMatch(index, indexMetaData.getIndex()), IndexScopedSettings.DEFAULT_SCOPED_SETTINGS); } - IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSettings, IndexScopedSettings indexScopedSettings) { - this(indexMetaData, nodeSettings, (index) -> Regex.simpleMatch(index, indexMetaData.getIndex()), indexScopedSettings); - } - /** * Creates a new {@link IndexSettings} instance. The given node settings will be merged with the settings in the metadata * while index level settings will overwrite node settings. @@ -457,5 +453,5 @@ public final class IndexSettings { } - public IndexScopedSettings getScopedSettings() { return scopedSettings;} + IndexScopedSettings getScopedSettings() { return scopedSettings;} } diff --git a/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java b/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java index 362e9099ee2..bbe8c566de9 100644 --- a/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java +++ b/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java @@ -23,6 +23,7 @@ import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.TieredMergePolicy; import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -137,16 +138,17 @@ public final class MergePolicyConfig { public static final String INDEX_MERGE_ENABLED = "index.merge.enabled"; // don't convert to Setting<> and register... we only set this in tests and register via a plugin - MergePolicyConfig(ESLogger logger, IndexSettings indexSettings) { + MergePolicyConfig(ESLogger logger, IndexSettings indexSettings) { this.logger = logger; - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_COMPOUND_FORMAT_SETTING, this::setNoCFSRatio); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, this::expungeDeletesAllowed); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, this::floorSegmentSetting); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, this::maxMergesAtOnce); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, this::maxMergesAtOnceExplicit); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, this::maxMergedSegment); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, this::segmentsPerTier); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, this::reclaimDeletesWeight); + IndexScopedSettings scopedSettings = indexSettings.getScopedSettings(); + scopedSettings.addSettingsUpdateConsumer(INDEX_COMPOUND_FORMAT_SETTING, this::setNoCFSRatio); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, this::expungeDeletesAllowed); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, this::floorSegmentSetting); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, this::maxMergesAtOnce); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, this::maxMergesAtOnceExplicit); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, this::maxMergedSegment); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, this::segmentsPerTier); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, this::reclaimDeletesWeight); double forceMergeDeletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING); // percentage ByteSizeValue floorSegment = indexSettings.getValue(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING); int maxMergeAtOnce = indexSettings.getValue(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING); @@ -168,8 +170,10 @@ public final class MergePolicyConfig { mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac()); mergePolicy.setSegmentsPerTier(segmentsPerTier); mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight); - logger.debug("using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]", + if (logger.isTraceEnabled()) { + logger.trace("using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}], max_merge_at_once[{}], max_merge_at_once_explicit[{}], max_merged_segment[{}], segments_per_tier[{}], reclaim_deletes_weight[{}]", forceMergeDeletesPctAllowed, floorSegment, maxMergeAtOnce, maxMergeAtOnceExplicit, maxMergedSegment, segmentsPerTier, reclaimDeletesWeight); + } } private void reclaimDeletesWeight(Double reclaimDeletesWeight) { diff --git a/core/src/main/java/org/elasticsearch/index/store/IndexStore.java b/core/src/main/java/org/elasticsearch/index/store/IndexStore.java index 29401fdfd51..e98ad7cc6eb 100644 --- a/core/src/main/java/org/elasticsearch/index/store/IndexStore.java +++ b/core/src/main/java/org/elasticsearch/index/store/IndexStore.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.store; import org.apache.lucene.store.StoreRateLimiting; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.IndexSettings; @@ -30,16 +29,17 @@ import org.elasticsearch.index.shard.ShardPath; * */ public class IndexStore extends AbstractIndexComponent { - public static final Setting INDEX_STORE_THROTTLE_TYPE_SETTING = new Setting<>("index.store.throttle.type", "none", StoreRateLimiting.Type::fromString, true, Setting.Scope.INDEX) ; + public static final Setting INDEX_STORE_THROTTLE_TYPE_SETTING = new Setting<>("index.store.throttle.type", "none", IndexRateLimitingType::fromString, true, Setting.Scope.INDEX) ; public static final Setting INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting("index.store.throttle.max_bytes_per_sec", new ByteSizeValue(0), true, Setting.Scope.INDEX); protected final IndexStoreConfig indexStoreConfig; private final StoreRateLimiting rateLimiting = new StoreRateLimiting(); + private volatile IndexRateLimitingType type; public IndexStore(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig) { super(indexSettings); this.indexStoreConfig = indexStoreConfig; - rateLimiting.setType(indexSettings.getValue(INDEX_STORE_THROTTLE_TYPE_SETTING)); + setType(indexSettings.getValue(INDEX_STORE_THROTTLE_TYPE_SETTING)); rateLimiting.setMaxRate(indexSettings.getValue(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING)); logger.debug("using index.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimiting.getType(), rateLimiting.getRateLimiter()); } @@ -49,7 +49,7 @@ public class IndexStore extends AbstractIndexComponent { * the node level one (defaults to the node level one). */ public StoreRateLimiting rateLimiting() { - return rateLimiting.getType() == StoreRateLimiting.Type.NONE ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting; + return type.useStoreLimiter() ? indexStoreConfig.getNodeRateLimiter() : this.rateLimiting; } /** @@ -59,11 +59,44 @@ public class IndexStore extends AbstractIndexComponent { return new FsDirectoryService(indexSettings, this, path); } - public void setType(StoreRateLimiting.Type type) { - rateLimiting.setType(type); + public void setType(IndexRateLimitingType type) { + this.type = type; + if (type.useStoreLimiter() == false) { + rateLimiting.setType(type.type); + } } public void setMaxRate(ByteSizeValue rate) { rateLimiting.setMaxRate(rate); } + + /** + * On an index level we can configure all of {@link org.apache.lucene.store.StoreRateLimiting.Type} as well as + * node which will then use a global rate limiter that has it's own configuration. The global one is + * configured in {@link IndexStoreConfig} which is managed by the per-node {@link org.elasticsearch.indices.IndicesService} + */ + public static final class IndexRateLimitingType { + private final StoreRateLimiting.Type type; + + private IndexRateLimitingType(StoreRateLimiting.Type type) { + this.type = type; + } + + private boolean useStoreLimiter() { + return type == null; + } + + static IndexRateLimitingType fromString(String type) { + if ("node".equalsIgnoreCase(type)) { + return new IndexRateLimitingType(null); + } else { + try { + return new IndexRateLimitingType(StoreRateLimiting.Type.fromString(type)); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException("rate limiting type [" + type + "] not valid, can be one of [all|merge|none|node]"); + } + } + } + } + } diff --git a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index dc483932fec..d1075031ee1 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/core/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -111,11 +111,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe } for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { - IndexSettings indexSettings = new IndexSettings(event.state().getMetaData().index(indexRoutingTable.index()), settings); // Note, closed indices will not have any routing information, so won't be deleted for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) { ShardId shardId = indexShardRoutingTable.shardId(); + IndexService indexService = indicesService.indexService(indexRoutingTable.getIndex()); + IndexSettings indexSettings = indexService != null ? indexService.getIndexSettings() : new IndexSettings(event.state().getMetaData().index(indexRoutingTable.index()), settings); if (indicesService.canDeleteShardContent(shardId, indexSettings)) { deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable); } diff --git a/core/src/test/java/org/elasticsearch/index/store/IndexStoreTests.java b/core/src/test/java/org/elasticsearch/index/store/IndexStoreTests.java index 300e4bb9ab4..214c86a498a 100644 --- a/core/src/test/java/org/elasticsearch/index/store/IndexStoreTests.java +++ b/core/src/test/java/org/elasticsearch/index/store/IndexStoreTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.NoLockFactory; import org.apache.lucene.store.SimpleFSDirectory; +import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.util.Constants; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -95,4 +96,24 @@ public class IndexStoreTests extends ESTestCase { } } } + + public void testUpdateThrottleType() throws IOException { + Settings settings = Settings.settingsBuilder().put(IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING.getKey(), "all") + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(new Index("foo"), settings); + IndexStoreConfig indexStoreConfig = new IndexStoreConfig(settings); + IndexStore store = new IndexStore(indexSettings, indexStoreConfig); + assertEquals(StoreRateLimiting.Type.NONE, store.rateLimiting().getType()); + assertEquals(StoreRateLimiting.Type.ALL, indexStoreConfig.getNodeRateLimiter().getType()); + assertNotSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting()); + + store.setType(IndexStore.IndexRateLimitingType.fromString("NODE")); + assertEquals(StoreRateLimiting.Type.ALL, store.rateLimiting().getType()); + assertSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting()); + + store.setType(IndexStore.IndexRateLimitingType.fromString("merge")); + assertEquals(StoreRateLimiting.Type.MERGE, store.rateLimiting().getType()); + assertNotSame(indexStoreConfig.getNodeRateLimiter(), store.rateLimiting()); + assertEquals(StoreRateLimiting.Type.ALL, indexStoreConfig.getNodeRateLimiter().getType()); + } } diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 4cbf436a743..dac6ac0904d 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1540,7 +1540,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas // Update settings to back to normal assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(Settings.builder() - .put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "none") + .put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "node") )); logger.info("--> wait for snapshot to complete"); From ebc9dcf0c7f80611e163016035aec4ae1ed00a50 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 21 Jan 2016 11:05:51 +0100 Subject: [PATCH 2/2] Cleanup index settings updaters --- .../gateway/PrimaryShardAllocator.java | 14 +++++----- .../elasticsearch/index/IndexSettings.java | 26 ++++++++++++++----- .../index/MergePolicyConfig.java | 24 ++++++----------- .../index/MergeSchedulerConfig.java | 5 ---- 4 files changed, 34 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java index 0098e93d5a4..3d3a0e3b59c 100644 --- a/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java @@ -91,8 +91,8 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { } final IndexMetaData indexMetaData = metaData.index(shard.getIndex()); - final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); - + // don't go wild here and create a new IndexSetting object for every shard this could cause a lot of garbage + // on cluster restart if we allocate a boat load of shards if (shard.allocatedPostIndexCreate(indexMetaData) == false) { // when we create a fresh index continue; @@ -108,13 +108,13 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { final Set lastActiveAllocationIds = indexMetaData.activeAllocationIds(shard.id()); final boolean snapshotRestore = shard.restoreSource() != null; - final boolean recoverOnAnyNode = recoverOnAnyNode(indexSettings); + final boolean recoverOnAnyNode = recoverOnAnyNode(indexMetaData); final NodesAndVersions nodesAndVersions; final boolean enoughAllocationsFound; if (lastActiveAllocationIds.isEmpty()) { - assert indexSettings.getIndexVersionCreated().before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new"; + assert Version.indexCreated(indexMetaData.getSettings()).before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new"; // when we load an old index (after upgrading cluster) or restore a snapshot of an old index // fall back to old version-based allocation mode // Note that once the shard has been active, lastActiveAllocationIds will be non-empty @@ -356,9 +356,9 @@ public abstract class PrimaryShardAllocator extends AbstractComponent { * Return {@code true} if the index is configured to allow shards to be * recovered on any node */ - private boolean recoverOnAnyNode(IndexSettings indexSettings) { - return indexSettings.isOnSharedFilesystem() - && IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING.get(indexSettings.getSettings()); + private boolean recoverOnAnyNode(IndexMetaData metaData) { + return (IndexMetaData.isOnSharedFilesystem(metaData.getSettings()) || IndexMetaData.isOnSharedFilesystem(this.settings)) + && IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING.get(metaData.getSettings(), this.settings); } protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); diff --git a/core/src/main/java/org/elasticsearch/index/IndexSettings.java b/core/src/main/java/org/elasticsearch/index/IndexSettings.java index dc002135966..17fe4ef499d 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/core/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.translog.Translog; import java.util.Locale; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -196,24 +197,35 @@ public final class IndexSettings { this.defaultAllowUnmappedFields = scopedSettings.get(ALLOW_UNMAPPED); this.indexNameMatcher = indexNameMatcher; this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING); - scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability); syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings); refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING); - scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTTING); - scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTTING, this::setTranslogFlushThresholdSize); mergeSchedulerConfig = new MergeSchedulerConfig(this); - scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes); gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING); - scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer); maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING); - scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow); TTLPurgeDisabled = scopedSettings.get(INDEX_TTL_DISABLE_PURGE_SETTING); - scopedSettings.addSettingsUpdateConsumer(INDEX_TTL_DISABLE_PURGE_SETTING, this::setTTLPurgeDisabled); this.mergePolicyConfig = new MergePolicyConfig(logger, this); assert indexNameMatcher.test(indexMetaData.getIndex()); + scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio); + scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed); + scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, mergePolicyConfig::setFloorSegmentSetting); + scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, mergePolicyConfig::setMaxMergesAtOnce); + scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, mergePolicyConfig::setMaxMergesAtOnceExplicit); + scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, mergePolicyConfig::setMaxMergedSegment); + scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, mergePolicyConfig::setSegmentsPerTier); + scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, mergePolicyConfig::setReclaimDeletesWeight); + scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING, mergeSchedulerConfig::setMaxThreadCount); + scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING, mergeSchedulerConfig::setMaxMergeCount); + scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability); + scopedSettings.addSettingsUpdateConsumer(INDEX_TTL_DISABLE_PURGE_SETTING, this::setTTLPurgeDisabled); + scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow); + scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer); + scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes); + scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTTING, this::setTranslogFlushThresholdSize); + scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval); } private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) { diff --git a/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java b/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java index bbe8c566de9..fc9f30cf3fd 100644 --- a/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java +++ b/core/src/main/java/org/elasticsearch/index/MergePolicyConfig.java @@ -141,14 +141,6 @@ public final class MergePolicyConfig { MergePolicyConfig(ESLogger logger, IndexSettings indexSettings) { this.logger = logger; IndexScopedSettings scopedSettings = indexSettings.getScopedSettings(); - scopedSettings.addSettingsUpdateConsumer(INDEX_COMPOUND_FORMAT_SETTING, this::setNoCFSRatio); - scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, this::expungeDeletesAllowed); - scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, this::floorSegmentSetting); - scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, this::maxMergesAtOnce); - scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, this::maxMergesAtOnceExplicit); - scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, this::maxMergedSegment); - scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, this::segmentsPerTier); - scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, this::reclaimDeletesWeight); double forceMergeDeletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING); // percentage ByteSizeValue floorSegment = indexSettings.getValue(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING); int maxMergeAtOnce = indexSettings.getValue(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING); @@ -176,35 +168,35 @@ public final class MergePolicyConfig { } } - private void reclaimDeletesWeight(Double reclaimDeletesWeight) { + void setReclaimDeletesWeight(Double reclaimDeletesWeight) { mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight); } - private void segmentsPerTier(Double segmentsPerTier) { + void setSegmentsPerTier(Double segmentsPerTier) { mergePolicy.setSegmentsPerTier(segmentsPerTier); } - private void maxMergedSegment(ByteSizeValue maxMergedSegment) { + void setMaxMergedSegment(ByteSizeValue maxMergedSegment) { mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac()); } - private void maxMergesAtOnceExplicit(Integer maxMergeAtOnceExplicit) { + void setMaxMergesAtOnceExplicit(Integer maxMergeAtOnceExplicit) { mergePolicy.setMaxMergeAtOnceExplicit(maxMergeAtOnceExplicit); } - private void maxMergesAtOnce(Integer maxMergeAtOnce) { + void setMaxMergesAtOnce(Integer maxMergeAtOnce) { mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); } - private void floorSegmentSetting(ByteSizeValue floorSegementSetting) { + void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) { mergePolicy.setFloorSegmentMB(floorSegementSetting.mbFrac()); } - private void expungeDeletesAllowed(Double value) { + void setExpungeDeletesAllowed(Double value) { mergePolicy.setForceMergeDeletesPctAllowed(value); } - private void setNoCFSRatio(Double noCFSRatio) { + void setNoCFSRatio(Double noCFSRatio) { mergePolicy.setNoCFSRatio(noCFSRatio); } diff --git a/core/src/main/java/org/elasticsearch/index/MergeSchedulerConfig.java b/core/src/main/java/org/elasticsearch/index/MergeSchedulerConfig.java index 59576f1869b..0d212a4eb30 100644 --- a/core/src/main/java/org/elasticsearch/index/MergeSchedulerConfig.java +++ b/core/src/main/java/org/elasticsearch/index/MergeSchedulerConfig.java @@ -21,9 +21,7 @@ package org.elasticsearch.index; import org.apache.lucene.index.ConcurrentMergeScheduler; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.index.IndexSettings; /** * The merge scheduler (ConcurrentMergeScheduler) controls the execution of @@ -62,9 +60,6 @@ public final class MergeSchedulerConfig { private volatile int maxMergeCount; MergeSchedulerConfig(IndexSettings indexSettings) { - indexSettings.getScopedSettings().addSettingsUpdateConsumer(MAX_THREAD_COUNT_SETTING, this::setMaxThreadCount); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(MAX_MERGE_COUNT_SETTING, this::setMaxMergeCount); - indexSettings.getScopedSettings().addSettingsUpdateConsumer(AUTO_THROTTLE_SETTING, this::setAutoThrottle); maxThreadCount = indexSettings.getValue(MAX_THREAD_COUNT_SETTING); maxMergeCount = indexSettings.getValue(MAX_MERGE_COUNT_SETTING); this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING);