mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 22:36:20 +00:00
Cleanup index settings updaters
This commit is contained in:
parent
d9422b5e89
commit
ebc9dcf0c7
@ -91,8 +91,8 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
}
|
||||
|
||||
final IndexMetaData indexMetaData = metaData.index(shard.getIndex());
|
||||
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
|
||||
|
||||
// don't go wild here and create a new IndexSetting object for every shard this could cause a lot of garbage
|
||||
// on cluster restart if we allocate a boat load of shards
|
||||
if (shard.allocatedPostIndexCreate(indexMetaData) == false) {
|
||||
// when we create a fresh index
|
||||
continue;
|
||||
@ -108,13 +108,13 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
|
||||
final Set<String> lastActiveAllocationIds = indexMetaData.activeAllocationIds(shard.id());
|
||||
final boolean snapshotRestore = shard.restoreSource() != null;
|
||||
final boolean recoverOnAnyNode = recoverOnAnyNode(indexSettings);
|
||||
final boolean recoverOnAnyNode = recoverOnAnyNode(indexMetaData);
|
||||
|
||||
final NodesAndVersions nodesAndVersions;
|
||||
final boolean enoughAllocationsFound;
|
||||
|
||||
if (lastActiveAllocationIds.isEmpty()) {
|
||||
assert indexSettings.getIndexVersionCreated().before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new";
|
||||
assert Version.indexCreated(indexMetaData.getSettings()).before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new";
|
||||
// when we load an old index (after upgrading cluster) or restore a snapshot of an old index
|
||||
// fall back to old version-based allocation mode
|
||||
// Note that once the shard has been active, lastActiveAllocationIds will be non-empty
|
||||
@ -356,9 +356,9 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
||||
* Return {@code true} if the index is configured to allow shards to be
|
||||
* recovered on any node
|
||||
*/
|
||||
private boolean recoverOnAnyNode(IndexSettings indexSettings) {
|
||||
return indexSettings.isOnSharedFilesystem()
|
||||
&& IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING.get(indexSettings.getSettings());
|
||||
private boolean recoverOnAnyNode(IndexMetaData metaData) {
|
||||
return (IndexMetaData.isOnSharedFilesystem(metaData.getSettings()) || IndexMetaData.isOnSharedFilesystem(this.settings))
|
||||
&& IndexMetaData.INDEX_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE_SETTING.get(metaData.getSettings(), this.settings);
|
||||
}
|
||||
|
||||
protected abstract AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);
|
||||
|
@ -36,6 +36,7 @@ import org.elasticsearch.index.translog.Translog;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
@ -196,24 +197,35 @@ public final class IndexSettings {
|
||||
this.defaultAllowUnmappedFields = scopedSettings.get(ALLOW_UNMAPPED);
|
||||
this.indexNameMatcher = indexNameMatcher;
|
||||
this.durability = scopedSettings.get(INDEX_TRANSLOG_DURABILITY_SETTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
|
||||
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
|
||||
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
|
||||
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTTING, this::setTranslogFlushThresholdSize);
|
||||
mergeSchedulerConfig = new MergeSchedulerConfig(this);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
|
||||
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
|
||||
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
|
||||
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow);
|
||||
TTLPurgeDisabled = scopedSettings.get(INDEX_TTL_DISABLE_PURGE_SETTING);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TTL_DISABLE_PURGE_SETTING, this::setTTLPurgeDisabled);
|
||||
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
|
||||
assert indexNameMatcher.test(indexMetaData.getIndex());
|
||||
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, mergePolicyConfig::setFloorSegmentSetting);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, mergePolicyConfig::setMaxMergesAtOnce);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, mergePolicyConfig::setMaxMergesAtOnceExplicit);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, mergePolicyConfig::setMaxMergedSegment);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, mergePolicyConfig::setSegmentsPerTier);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, mergePolicyConfig::setReclaimDeletesWeight);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING, mergeSchedulerConfig::setMaxThreadCount);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING, mergeSchedulerConfig::setMaxMergeCount);
|
||||
scopedSettings.addSettingsUpdateConsumer(MergeSchedulerConfig.AUTO_THROTTLE_SETTING, mergeSchedulerConfig::setAutoThrottle);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_DURABILITY_SETTING, this::setTranslogDurability);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TTL_DISABLE_PURGE_SETTING, this::setTTLPurgeDisabled);
|
||||
scopedSettings.addSettingsUpdateConsumer(MAX_RESULT_WINDOW_SETTING, this::setMaxResultWindow);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_WARMER_ENABLED_SETTING, this::setEnableWarmer);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_GC_DELETES_SETTING, this::setGCDeletes);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTTING, this::setTranslogFlushThresholdSize);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
|
||||
}
|
||||
|
||||
private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
|
||||
|
@ -141,14 +141,6 @@ public final class MergePolicyConfig {
|
||||
MergePolicyConfig(ESLogger logger, IndexSettings indexSettings) {
|
||||
this.logger = logger;
|
||||
IndexScopedSettings scopedSettings = indexSettings.getScopedSettings();
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_COMPOUND_FORMAT_SETTING, this::setNoCFSRatio);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, this::expungeDeletesAllowed);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, this::floorSegmentSetting);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, this::maxMergesAtOnce);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_EXPLICIT_SETTING, this::maxMergesAtOnceExplicit);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, this::maxMergedSegment);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, this::segmentsPerTier);
|
||||
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, this::reclaimDeletesWeight);
|
||||
double forceMergeDeletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING); // percentage
|
||||
ByteSizeValue floorSegment = indexSettings.getValue(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING);
|
||||
int maxMergeAtOnce = indexSettings.getValue(INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING);
|
||||
@ -176,35 +168,35 @@ public final class MergePolicyConfig {
|
||||
}
|
||||
}
|
||||
|
||||
private void reclaimDeletesWeight(Double reclaimDeletesWeight) {
|
||||
void setReclaimDeletesWeight(Double reclaimDeletesWeight) {
|
||||
mergePolicy.setReclaimDeletesWeight(reclaimDeletesWeight);
|
||||
}
|
||||
|
||||
private void segmentsPerTier(Double segmentsPerTier) {
|
||||
void setSegmentsPerTier(Double segmentsPerTier) {
|
||||
mergePolicy.setSegmentsPerTier(segmentsPerTier);
|
||||
}
|
||||
|
||||
private void maxMergedSegment(ByteSizeValue maxMergedSegment) {
|
||||
void setMaxMergedSegment(ByteSizeValue maxMergedSegment) {
|
||||
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.mbFrac());
|
||||
}
|
||||
|
||||
private void maxMergesAtOnceExplicit(Integer maxMergeAtOnceExplicit) {
|
||||
void setMaxMergesAtOnceExplicit(Integer maxMergeAtOnceExplicit) {
|
||||
mergePolicy.setMaxMergeAtOnceExplicit(maxMergeAtOnceExplicit);
|
||||
}
|
||||
|
||||
private void maxMergesAtOnce(Integer maxMergeAtOnce) {
|
||||
void setMaxMergesAtOnce(Integer maxMergeAtOnce) {
|
||||
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
|
||||
}
|
||||
|
||||
private void floorSegmentSetting(ByteSizeValue floorSegementSetting) {
|
||||
void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) {
|
||||
mergePolicy.setFloorSegmentMB(floorSegementSetting.mbFrac());
|
||||
}
|
||||
|
||||
private void expungeDeletesAllowed(Double value) {
|
||||
void setExpungeDeletesAllowed(Double value) {
|
||||
mergePolicy.setForceMergeDeletesPctAllowed(value);
|
||||
}
|
||||
|
||||
private void setNoCFSRatio(Double noCFSRatio) {
|
||||
void setNoCFSRatio(Double noCFSRatio) {
|
||||
mergePolicy.setNoCFSRatio(noCFSRatio);
|
||||
}
|
||||
|
||||
|
@ -21,9 +21,7 @@ package org.elasticsearch.index;
|
||||
|
||||
import org.apache.lucene.index.ConcurrentMergeScheduler;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
|
||||
/**
|
||||
* The merge scheduler (<code>ConcurrentMergeScheduler</code>) controls the execution of
|
||||
@ -62,9 +60,6 @@ public final class MergeSchedulerConfig {
|
||||
private volatile int maxMergeCount;
|
||||
|
||||
MergeSchedulerConfig(IndexSettings indexSettings) {
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(MAX_THREAD_COUNT_SETTING, this::setMaxThreadCount);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(MAX_MERGE_COUNT_SETTING, this::setMaxMergeCount);
|
||||
indexSettings.getScopedSettings().addSettingsUpdateConsumer(AUTO_THROTTLE_SETTING, this::setAutoThrottle);
|
||||
maxThreadCount = indexSettings.getValue(MAX_THREAD_COUNT_SETTING);
|
||||
maxMergeCount = indexSettings.getValue(MAX_MERGE_COUNT_SETTING);
|
||||
this.autoThrottle = indexSettings.getValue(AUTO_THROTTLE_SETTING);
|
||||
|
Loading…
x
Reference in New Issue
Block a user