Move all dynamic settings and their config classes to the index level

Today we maintain a lot of settings on the shard level which are all index level settings.
In order to cut over to the new settings API where we register update listener we have to move
all of them on to the index level otherwise we need a way to un-register listeners which is error-prone
and requires additional handling when shards are closed. It's simpler and also more accurate to handle all of
them on the index level where we can trash the entire registry for update listener once the index goes out of scope.
This commit is contained in:
Simon Willnauer 2016-01-13 14:25:37 +01:00
parent 13c0b1932b
commit 437d7c179a
28 changed files with 252 additions and 255 deletions

View File

@ -66,13 +66,11 @@ import org.elasticsearch.common.util.ExtensionPoint;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.IndexingSlowLog;
import org.elasticsearch.index.search.stats.SearchSlowLog;
import org.elasticsearch.index.settings.IndexDynamicSettings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
@ -150,8 +148,8 @@ public class ClusterModule extends AbstractModule {
registerIndexDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE, Validator.EMPTY);
registerIndexDynamicSetting(IndexSettings.INDEX_REFRESH_INTERVAL, Validator.TIME);
registerIndexDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, Validator.EMPTY);
registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
registerIndexDynamicSetting(IndexSettings.INDEX_GC_DELETES_SETTING, Validator.TIME);
registerIndexDynamicSetting(IndexSettings.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
@ -178,7 +176,7 @@ public class ClusterModule extends AbstractModule {
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, Validator.DOUBLE_GTE_2);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
registerIndexDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT, Validator.EMPTY);
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
registerIndexDynamicSetting(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
registerIndexDynamicSetting(IndexSettings.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY);
registerIndexDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED, Validator.EMPTY);
registerIndexDynamicSetting(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);

View File

@ -61,6 +61,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.search.stats.SearchSlowLog;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexShard;
@ -108,6 +109,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private final IndexingOperationListener[] listeners;
private volatile AsyncRefreshTask refreshTask;
private final AsyncTranslogFSync fsyncTask;
private final SearchSlowLog searchSlowLog;
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
SimilarityService similarityService,
@ -151,6 +153,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
this.fsyncTask = null;
}
this.refreshTask = new AsyncRefreshTask(this);
searchSlowLog = new SearchSlowLog(indexSettings.getSettings());
}
public int numberOfShards() {
@ -313,9 +316,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> nodeServicesProvider.getIndicesQueryCache().onClose(shardId)));
if (useShadowEngine(primary, indexSettings)) {
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); // no indexing listeners - shadow engines don't index
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog); // no indexing listeners - shadow engines don't index
} else {
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, listeners);
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, listeners);
}
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
@ -414,6 +417,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
return nodeServicesProvider.getThreadPool();
}
public SearchSlowLog getSearchSlowLog() {
return searchSlowLog;
}
private class StoreCloseListener implements Store.OnClose {
private final ShardId shardId;
private final boolean ownsShard;
@ -562,9 +569,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
final Settings settings = indexSettings.getSettings();
for (final IndexShard shard : this.shards.values()) {
try {
shard.onRefreshSettings(settings);
shard.onSettingsChanged();
} catch (Exception e) {
logger.warn("[{}] failed to refresh shard settings", e, shard.shardId().id());
logger.warn("[{}] failed to notify shard about setting change", e, shard.shardId().id());
}
}
try {
@ -577,6 +584,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
} catch (Exception e) {
logger.warn("failed to refresh slowlog settings", e);
}
try {
searchSlowLog.onRefreshSettings(settings); // this will be refactored soon anyway so duplication is ok here
} catch (Exception e) {
logger.warn("failed to refresh slowlog settings", e);
}
if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
rescheduleRefreshTasks();
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.index;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseFieldMatcher;
@ -25,11 +26,11 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
@ -59,6 +60,19 @@ public final class IndexSettings {
public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability";
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
/**
* Index setting to control if a flush is executed before engine is closed
* This setting is realtime updateable.
*/
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
/**
* Index setting to enable / disable deletes garbage collection.
* This setting is realtime updateable
*/
public static final String INDEX_GC_DELETES_SETTING = "index.gc_deletes";
private final String uuid;
private final List<Consumer<Settings>> updateListeners;
@ -82,7 +96,12 @@ public final class IndexSettings {
private volatile Translog.Durability durability;
private final TimeValue syncInterval;
private volatile TimeValue refreshInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile boolean flushOnClose = true;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
/**
@ -165,6 +184,11 @@ public final class IndexSettings {
this.durability = getFromSettings(settings, Translog.Durability.REQUEST);
syncInterval = settings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, DEFAULT_REFRESH_INTERVAL);
flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
flushOnClose = settings.getAsBoolean(IndexSettings.INDEX_FLUSH_ON_CLOSE, true);
mergeSchedulerConfig = new MergeSchedulerConfig(settings);
gcDeletesInMillis = settings.getAsTime(IndexSettings.INDEX_GC_DELETES_SETTING, DEFAULT_GC_DELETES).getMillis();
this.mergePolicyConfig = new MergePolicyConfig(logger, settings);
assert indexNameMatcher.test(indexMetaData.getIndex());
}
@ -360,13 +384,88 @@ public final class IndexSettings {
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
this.refreshInterval = refreshInterval;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, this.flushThresholdSize);
if (!flushThresholdSize.equals(this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", this.flushThresholdSize, flushThresholdSize);
this.flushThresholdSize = flushThresholdSize;
}
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, this.flushOnClose);
if (flushOnClose != this.flushOnClose) {
logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, this.flushOnClose, flushOnClose);
this.flushOnClose = flushOnClose;
}
final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount);
mergeSchedulerConfig.setMaxThreadCount(maxThreadCount);
}
final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount());
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount);
mergeSchedulerConfig.setMaxMergeCount(maxMergeCount);
}
final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle());
if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle);
mergeSchedulerConfig.setAutoThrottle(autoThrottle);
}
long gcDeletesInMillis = settings.getAsTime(IndexSettings.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(this.gcDeletesInMillis)).getMillis();
if (gcDeletesInMillis != this.gcDeletesInMillis) {
logger.info("updating {} from [{}] to [{}]", IndexSettings.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis));
this.gcDeletesInMillis = gcDeletesInMillis;
}
mergePolicyConfig.onRefreshSettings(settings);
}
/**
* Returns the translog sync interval. This is the interval in which the transaction log is asynchronously fsynced unless
* the transaction log is fsyncing on every operations
*/
public TimeValue getTranslogSyncInterval() {
return syncInterval;
}
/**
* Returns this interval in which the shards of this index are asynchronously refreshed. <tt>-1</tt> means async refresh is disabled.
*/
public TimeValue getRefreshInterval() {
return refreshInterval;
}
/**
* Returns the transaction log threshold size when to forcefully flush the index and clear the transaction log.
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }
/**
* Returns <code>true</code> iff this index should be flushed on close. Default is <code>true</code>
*/
public boolean isFlushOnClose() { return flushOnClose; }
/**
* Returns the {@link MergeSchedulerConfig}
*/
public MergeSchedulerConfig getMergeSchedulerConfig() { return mergeSchedulerConfig; }
/**
* Returns the GC deletes cycle in milliseconds.
*/
public long getGcDeletesInMillis() {
return gcDeletesInMillis;
}
/**
* Returns the merge policy that should be used for this index.
*/
public MergePolicy getMergePolicy() {
return mergePolicyConfig.getMergePolicy();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.shard;
package org.elasticsearch.index;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
@ -138,7 +138,7 @@ public final class MergePolicyConfig {
public static final String INDEX_MERGE_ENABLED = "index.merge.enabled";
public MergePolicyConfig(ESLogger logger, Settings indexSettings) {
MergePolicyConfig(ESLogger logger, Settings indexSettings) {
this.logger = logger;
this.noCFSRatio = parseNoCFSRatio(indexSettings.get(INDEX_COMPOUND_FORMAT, Double.toString(TieredMergePolicy.DEFAULT_NO_CFS_RATIO)));
double forceMergeDeletesPctAllowed = indexSettings.getAsDouble("index.merge.policy.expunge_deletes_allowed", DEFAULT_EXPUNGE_DELETES_ALLOWED); // percentage
@ -180,11 +180,11 @@ public final class MergePolicyConfig {
return maxMergeAtOnce;
}
public MergePolicy getMergePolicy() {
MergePolicy getMergePolicy() {
return mergesEnabled ? mergePolicy : NoMergePolicy.INSTANCE;
}
public void onRefreshSettings(Settings settings) {
void onRefreshSettings(Settings settings) {
final double oldExpungeDeletesPctAllowed = mergePolicy.getForceMergeDeletesPctAllowed();
final double expungeDeletesPctAllowed = settings.getAsDouble(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED, oldExpungeDeletesPctAllowed);
if (expungeDeletesPctAllowed != oldExpungeDeletesPctAllowed) {
@ -243,7 +243,7 @@ public final class MergePolicyConfig {
}
}
public static double parseNoCFSRatio(String noCFSRatio) {
private static double parseNoCFSRatio(String noCFSRatio) {
noCFSRatio = noCFSRatio.trim();
if (noCFSRatio.equalsIgnoreCase("true")) {
return 1.0d;
@ -262,7 +262,7 @@ public final class MergePolicyConfig {
}
}
public static String formatNoCFSRatio(double ratio) {
private static String formatNoCFSRatio(double ratio) {
if (ratio == 1.0) {
return Boolean.TRUE.toString();
} else if (ratio == 0.0) {

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.index.shard;
package org.elasticsearch.index;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.elasticsearch.common.settings.Settings;
@ -60,8 +60,7 @@ public final class MergeSchedulerConfig {
private volatile int maxThreadCount;
private volatile int maxMergeCount;
public MergeSchedulerConfig(IndexSettings indexSettings) {
final Settings settings = indexSettings.getSettings();
MergeSchedulerConfig(Settings settings) {
maxThreadCount = settings.getAsInt(MAX_THREAD_COUNT, Math.max(1, Math.min(4, EsExecutors.boundedNumberOfProcessors(settings) / 2)));
maxMergeCount = settings.getAsInt(MAX_MERGE_COUNT, maxThreadCount + 5);
this.autoThrottle = settings.getAsBoolean(AUTO_THROTTLE, true);
@ -78,7 +77,7 @@ public final class MergeSchedulerConfig {
/**
* Enables / disables auto throttling on the {@link ConcurrentMergeScheduler}
*/
public void setAutoThrottle(boolean autoThrottle) {
void setAutoThrottle(boolean autoThrottle) {
this.autoThrottle = autoThrottle;
}
@ -93,7 +92,7 @@ public final class MergeSchedulerConfig {
* Expert: directly set the maximum number of merge threads and
* simultaneous merges allowed.
*/
public void setMaxThreadCount(int maxThreadCount) {
void setMaxThreadCount(int maxThreadCount) {
this.maxThreadCount = maxThreadCount;
}
@ -108,7 +107,7 @@ public final class MergeSchedulerConfig {
*
* Expert: set the maximum number of simultaneous merges allowed.
*/
public void setMaxMergeCount(int maxMergeCount) {
void setMaxMergeCount(int maxMergeCount) {
this.maxMergeCount = maxMergeCount;
}
}

View File

@ -36,7 +36,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
@ -67,8 +67,8 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
private final MergeSchedulerConfig config;
public ElasticsearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings, MergeSchedulerConfig config) {
this.config = config;
public ElasticsearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
this.config = indexSettings.getMergeSchedulerConfig();
this.shardId = shardId;
this.indexSettings = indexSettings.getSettings();
this.logger = Loggers.getLogger(getClass(), this.indexSettings, shardId);

View File

@ -31,7 +31,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
@ -39,8 +38,6 @@ import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.TimeUnit;
/*
* Holds all the configuration that is used to create an {@link Engine}.
* Once {@link Engine} has been created with this object, changes to this
@ -51,7 +48,6 @@ public final class EngineConfig {
private final TranslogRecoveryPerformer translogRecoveryPerformer;
private final IndexSettings indexSettings;
private final ByteSizeValue indexingBufferSize;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private volatile boolean enableGcDeletes = true;
private final TimeValue flushMergesAfter;
private final String codecName;
@ -60,7 +56,6 @@ public final class EngineConfig {
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final MergePolicy mergePolicy;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final Analyzer analyzer;
private final Similarity similarity;
private final CodecService codecService;
@ -69,12 +64,6 @@ public final class EngineConfig {
private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy;
/**
* Index setting to enable / disable deletes garbage collection.
* This setting is realtime updateable
*/
public static final String INDEX_GC_DELETES_SETTING = "index.gc_deletes";
/**
* Index setting to change the low level lucene codec used for writing new segments.
* This setting is <b>not</b> realtime updateable.
@ -84,8 +73,6 @@ public final class EngineConfig {
/** if set to true the engine will start even if the translog id in the commit point can not be found */
public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
private static final String DEFAULT_CODEC_NAME = "default";
private TranslogConfig translogConfig;
private boolean create = false;
@ -95,7 +82,7 @@ public final class EngineConfig {
*/
public EngineConfig(ShardId shardId, ThreadPool threadPool,
IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
MergePolicy mergePolicy,Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter) {
this.shardId = shardId;
@ -106,7 +93,6 @@ public final class EngineConfig {
this.store = store;
this.deletionPolicy = deletionPolicy;
this.mergePolicy = mergePolicy;
this.mergeSchedulerConfig = mergeSchedulerConfig;
this.analyzer = analyzer;
this.similarity = similarity;
this.codecService = codecService;
@ -116,7 +102,6 @@ public final class EngineConfig {
// there are not too many shards allocated to this node. Instead, IndexingMemoryController periodically checks
// and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high:
indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
this.translogRecoveryPerformer = translogRecoveryPerformer;
this.forceNewTranslog = settings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false);
this.queryCache = queryCache;
@ -146,19 +131,12 @@ public final class EngineConfig {
return indexingBufferSize;
}
/**
* Returns the GC deletes cycle in milliseconds.
*/
public long getGcDeletesInMillis() {
return gcDeletesInMillis;
}
/**
* Returns <code>true</code> iff delete garbage collection in the engine should be enabled. This setting is updateable
* in realtime and forces a volatile read. Consumers can safely read this value directly go fetch it's latest value. The default is <code>true</code>
* <p>
* Engine GC deletion if enabled collects deleted documents from in-memory realtime data structures after a certain amount of
* time ({@link #getGcDeletesInMillis()} if enabled. Before deletes are GCed they will cause re-adding the document that was deleted
* time ({@link IndexSettings#getGcDeletesInMillis()} if enabled. Before deletes are GCed they will cause re-adding the document that was deleted
* to fail.
* </p>
*/
@ -218,13 +196,6 @@ public final class EngineConfig {
return mergePolicy;
}
/**
* Returns the {@link MergeSchedulerConfig}
*/
public MergeSchedulerConfig getMergeSchedulerConfig() {
return mergeSchedulerConfig;
}
/**
* Returns a listener that should be called on engine failure
*/
@ -258,13 +229,6 @@ public final class EngineConfig {
return similarity;
}
/**
* Sets the GC deletes cycle in milliseconds.
*/
public void setGcDeletesInMillis(long gcDeletesInMillis) {
this.gcDeletesInMillis = gcDeletesInMillis;
}
/**
* Returns the {@link org.elasticsearch.index.shard.TranslogRecoveryPerformer} for this engine. This class is used
* to apply transaction log operations to the engine. It encapsulates all the logic to transfer the translog entry into

View File

@ -57,14 +57,12 @@ import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.translog.Translog;
@ -136,7 +134,7 @@ public class InternalEngine extends Engine {
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.warmer = engineConfig.getWarmer();
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings(), engineConfig.getMergeSchedulerConfig());
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
this.dirtyLocks = new Object[Runtime.getRuntime().availableProcessors() * 10]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();
@ -370,7 +368,7 @@ public class InternalEngine extends Engine {
deleted = currentVersion == Versions.NOT_FOUND;
} else {
deleted = versionValue.delete();
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
@ -436,7 +434,7 @@ public class InternalEngine extends Engine {
private void maybePruneDeletedTombstones() {
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
// every 1/4 of gcDeletesInMillis:
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > engineConfig.getGcDeletesInMillis() * 0.25) {
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
pruneDeletedTombstones();
}
}
@ -452,7 +450,7 @@ public class InternalEngine extends Engine {
deleted = currentVersion == Versions.NOT_FOUND;
} else {
deleted = versionValue.delete();
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
currentVersion = Versions.NOT_FOUND; // deleted, and GC
} else {
currentVersion = versionValue.version();
@ -701,7 +699,7 @@ public class InternalEngine extends Engine {
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
VersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
if (versionValue != null) {
if (timeMSec - versionValue.time() > engineConfig.getGcDeletesInMillis()) {
if (timeMSec - versionValue.time() > getGcDeletesInMillis()) {
versionMap.removeTombstoneUnderLock(uid);
}
}
@ -1072,7 +1070,7 @@ public class InternalEngine extends Engine {
}
long getGcDeletesInMillis() {
return engineConfig.getGcDeletesInMillis();
return engineConfig.getIndexSettings().getGcDeletesInMillis();
}
LiveIndexWriterConfig getCurrentIndexWriterConfig() {
@ -1083,8 +1081,8 @@ public class InternalEngine extends Engine {
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
private final AtomicBoolean isThrottling = new AtomicBoolean();
EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings, MergeSchedulerConfig config) {
super(shardId, indexSettings, config);
EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
super(shardId, indexSettings);
}
@Override

View File

@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
/**
*/
public final class SearchSlowLog{
public final class SearchSlowLog {
private boolean reformat;
@ -62,7 +62,7 @@ public final class SearchSlowLog{
public static final String INDEX_SEARCH_SLOWLOG_REFORMAT = INDEX_SEARCH_SLOWLOG_PREFIX + ".reformat";
public static final String INDEX_SEARCH_SLOWLOG_LEVEL = INDEX_SEARCH_SLOWLOG_PREFIX + ".level";
SearchSlowLog(Settings indexSettings) {
public SearchSlowLog(Settings indexSettings) {
this.reformat = indexSettings.getAsBoolean(INDEX_SEARCH_SLOWLOG_REFORMAT, true);
@ -109,7 +109,7 @@ public final class SearchSlowLog{
}
}
synchronized void onRefreshSettings(Settings settings) {
public void onRefreshSettings(Settings settings) {
long queryWarnThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN, TimeValue.timeValueNanos(this.queryWarnThreshold)).nanos();
if (queryWarnThreshold != this.queryWarnThreshold) {
this.queryWarnThreshold = queryWarnThreshold;

View File

@ -41,8 +41,8 @@ public final class ShardSearchStats {
private final CounterMetric openContexts = new CounterMetric();
private volatile Map<String, StatsHolder> groupsStats = emptyMap();
public ShardSearchStats(Settings indexSettings) {
this.slowLogSearchService = new SearchSlowLog(indexSettings);
public ShardSearchStats(SearchSlowLog searchSlowLog) {
this.slowLogSearchService = searchSlowLog;
}
/**

View File

@ -47,7 +47,6 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
@ -92,6 +91,7 @@ import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchSlowLog;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.similarity.SimilarityService;
@ -141,7 +141,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final MapperService mapperService;
private final IndexCache indexCache;
private final Store store;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final InternalIndexingStats internalIndexingStats;
private final ShardSearchStats searchService;
private final ShardGetService getService;
@ -161,7 +160,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final SimilarityService similarityService;
private final EngineConfig engineConfig;
private final TranslogConfig translogConfig;
private final MergePolicyConfig mergePolicyConfig;
private final IndicesQueryCache indicesQueryCache;
private final IndexEventListener indexEventListener;
private final IndexSettings idxSettings;
@ -188,15 +186,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final MeanMetric flushMetric = new MeanMetric();
private final ShardEventListener shardEventListener = new ShardEventListener();
private volatile boolean flushOnClose = true;
private volatile ByteSizeValue flushThresholdSize;
/**
* Index setting to control if a flush is executed before engine is closed
* This setting is realtime updateable.
*/
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
private final ShardPath path;
@ -215,7 +204,7 @@ public class IndexShard extends AbstractIndexShardComponent {
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, IndexingOperationListener... listeners) {
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, SearchSlowLog slowLog, IndexingOperationListener... listeners) {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
this.idxSettings = indexSettings;
@ -227,7 +216,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
this.store = store;
this.indexEventListener = indexEventListener;
this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings);
this.threadPool = provider.getThreadPool();
this.mapperService = mapperService;
this.indexCache = indexCache;
@ -237,7 +225,7 @@ public class IndexShard extends AbstractIndexShardComponent {
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
this.getService = new ShardGetService(indexSettings, this, mapperService);
this.termVectorsService = provider.getTermVectorsService();
this.searchService = new ShardSearchStats(settings);
this.searchService = new ShardSearchStats(slowLog);
this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings);
this.indicesQueryCache = provider.getIndicesQueryCache();
this.shardQueryCache = new ShardRequestCache(shardId, indexSettings);
@ -245,9 +233,7 @@ public class IndexShard extends AbstractIndexShardComponent {
this.indexFieldDataService = indexFieldDataService;
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);
state = IndexShardState.CREATED;
this.flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
this.path = path;
this.mergePolicyConfig = new MergePolicyConfig(logger, settings);
/* create engine config */
logger.debug("state: [CREATED]");
@ -264,7 +250,6 @@ public class IndexShard extends AbstractIndexShardComponent {
}
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
this.flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
this.provider = provider;
this.searcherWrapper = indexSearcherWrapper;
@ -817,7 +802,7 @@ public class IndexShard extends AbstractIndexShardComponent {
} finally {
final Engine engine = this.currentEngineReference.getAndSet(null);
try {
if (engine != null && flushEngine && this.flushOnClose) {
if (engine != null && flushEngine && indexSettings.isFlushOnClose()) {
engine.flushAndClose();
}
} finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times
@ -1048,10 +1033,6 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
public final boolean isFlushOnClose() {
return flushOnClose;
}
/**
* Deletes the shards metadata state. This method can only be executed if the shard is not active.
*
@ -1093,7 +1074,7 @@ public class IndexShard extends AbstractIndexShardComponent {
if (engine != null) {
try {
Translog translog = engine.getTranslog();
return translog.sizeInBytes() > flushThresholdSize.bytes();
return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().bytes();
} catch (AlreadyClosedException | EngineClosedException ex) {
// that's fine we are already close - no need to flush
}
@ -1101,57 +1082,10 @@ public class IndexShard extends AbstractIndexShardComponent {
return false;
}
public void onRefreshSettings(Settings settings) {
boolean change = false;
synchronized (mutex) {
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
return;
}
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, this.flushThresholdSize);
if (!flushThresholdSize.equals(this.flushThresholdSize)) {
logger.info("updating flush_threshold_size from [{}] to [{}]", this.flushThresholdSize, flushThresholdSize);
this.flushThresholdSize = flushThresholdSize;
}
final EngineConfig config = engineConfig;
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, this.flushOnClose);
if (flushOnClose != this.flushOnClose) {
logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, this.flushOnClose, flushOnClose);
this.flushOnClose = flushOnClose;
}
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
config.setGcDeletesInMillis(gcDeletesInMillis);
change = true;
}
final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount);
mergeSchedulerConfig.setMaxThreadCount(maxThreadCount);
change = true;
}
final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount());
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount);
mergeSchedulerConfig.setMaxMergeCount(maxMergeCount);
change = true;
}
final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle());
if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle);
mergeSchedulerConfig.setAutoThrottle(autoThrottle);
change = true;
}
}
mergePolicyConfig.onRefreshSettings(settings);
searchService.onRefreshSettings(settings);
if (change) {
getEngine().onSettingsChanged();
public void onSettingsChanged() {
Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null) {
engineOrNull.onSettingsChanged();
}
}
@ -1431,7 +1365,7 @@ public class IndexShard extends AbstractIndexShardComponent {
};
final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel);
return new EngineConfig(shardId,
threadPool, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
threadPool, indexSettings, engineWarmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.search.stats.SearchSlowLog;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogStats;
@ -44,8 +45,8 @@ import java.io.IOException;
public final class ShadowIndexShard extends IndexShard {
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, NodeServicesProvider provider) throws IOException {
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, provider);
IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, NodeServicesProvider provider, SearchSlowLog searchSlowLog) throws IOException {
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, provider, searchSlowLog);
}
/**

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.admin.indices.segments;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.junit.Before;

View File

@ -50,7 +50,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.mapper.string.StringFieldMapperPositionIncrementGapTests;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;

View File

@ -181,7 +181,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
Settings idxSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB))
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB))
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
package org.elasticsearch.index;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
@ -24,6 +24,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;

View File

@ -21,52 +21,44 @@ package org.elasticsearch.index.engine;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.EngineAccess;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
public void testSettingsUpdate() {
final IndexService service = createIndex("foo");
// INDEX_COMPOUND_ON_FLUSH
InternalEngine engine = ((InternalEngine) EngineAccess.engine(service.getShardOrNull(0)));
assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(true));
// VERSION MAP SIZE
long indexBufferSize = engine.config().getIndexingBufferSize().bytes();
final int iters = between(1, 20);
for (int i = 0; i < iters; i++) {
boolean compoundOnFlush = randomBoolean();
// Tricky: TimeValue.parseTimeValue casts this long to a double, which steals 11 of the 64 bits for exponent, so we can't use
// the full long range here else the assert below fails:
long gcDeletes = random().nextLong() & (Long.MAX_VALUE >> 11);
Settings build = Settings.builder()
.put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes, TimeUnit.MILLISECONDS)
.put(IndexSettings.INDEX_GC_DELETES_SETTING, gcDeletes, TimeUnit.MILLISECONDS)
.build();
assertEquals(gcDeletes, build.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, null).millis());
assertEquals(gcDeletes, build.getAsTime(IndexSettings.INDEX_GC_DELETES_SETTING, null).millis());
client().admin().indices().prepareUpdateSettings("foo").setSettings(build).get();
LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
assertEquals(currentIndexWriterConfig.getUseCompoundFile(), true);
assertEquals(engine.config().getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.config().getIndexSettings().getGcDeletesInMillis(), gcDeletes);
assertEquals(engine.getGcDeletesInMillis(), gcDeletes);
indexBufferSize = engine.config().getIndexingBufferSize().bytes();
}
Settings settings = Settings.builder()
.put(EngineConfig.INDEX_GC_DELETES_SETTING, 1000, TimeUnit.MILLISECONDS)
.put(IndexSettings.INDEX_GC_DELETES_SETTING, 1000, TimeUnit.MILLISECONDS)
.build();
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
assertEquals(engine.getGcDeletesInMillis(), 1000);
@ -74,7 +66,7 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
settings = Settings.builder()
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "0ms")
.put(IndexSettings.INDEX_GC_DELETES_SETTING, "0ms")
.build();
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
@ -82,7 +74,7 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
assertTrue(engine.config().isEnableGcDeletes());
settings = Settings.builder()
.put(EngineConfig.INDEX_GC_DELETES_SETTING, 1000, TimeUnit.MILLISECONDS)
.put(IndexSettings.INDEX_GC_DELETES_SETTING, 1000, TimeUnit.MILLISECONDS)
.build();
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
assertEquals(engine.getGcDeletesInMillis(), 1000);

View File

@ -61,8 +61,6 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.Index;
@ -85,7 +83,7 @@ import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.object.RootObjectMapper;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
@ -95,7 +93,6 @@ import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.test.DummyShardLock;
@ -168,7 +165,7 @@ public class InternalEngineTests extends ESTestCase {
codecName = "default";
}
defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
.put(IndexSettings.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
.put(EngineConfig.INDEX_CODEC_SETTING, codecName)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build()); // TODO randomize more settings
@ -260,19 +257,19 @@ public class InternalEngineTests extends ESTestCase {
}
protected InternalEngine createEngine(Store store, Path translogPath) {
return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy());
return createEngine(defaultSettings, store, translogPath, newMergePolicy());
}
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy), false);
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
return new InternalEngine(config(indexSettings, store, translogPath, mergePolicy), false);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig config = new EngineConfig(shardId, threadPool, indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
, null, store, createSnapshotDeletionPolicy(), mergePolicy,
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), new Engine.EventListener() {
@Override
public void onFailedEngine(String reason, @Nullable Throwable t) {
@ -293,7 +290,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSegments() throws Exception {
try (Store store = createStore();
Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), NoMergePolicy.INSTANCE)) {
Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
List<Segment> segments = engine.segments(false);
assertThat(segments.isEmpty(), equalTo(true));
assertThat(engine.segmentsStats().getCount(), equalTo(0l));
@ -411,7 +408,7 @@ public class InternalEngineTests extends ESTestCase {
public void testVerboseSegments() throws Exception {
try (Store store = createStore();
Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), NoMergePolicy.INSTANCE)) {
Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
List<Segment> segments = engine.segments(true);
assertThat(segments.isEmpty(), equalTo(true));
@ -440,7 +437,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSegmentsWithMergeFlag() throws Exception {
try (Store store = createStore();
Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) {
Engine engine = createEngine(defaultSettings, store, createTempDir(), new TieredMergePolicy())) {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(newUid("1"), doc);
engine.index(index);
@ -770,7 +767,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSyncedFlush() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy()), false)) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
@ -797,7 +794,7 @@ public class InternalEngineTests extends ESTestCase {
final int iters = randomIntBetween(2, 5); // run this a couple of times to get some coverage
for (int i = 0; i < iters; i++) {
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogDocMergePolicy()), false)) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
@ -1027,7 +1024,7 @@ public class InternalEngineTests extends ESTestCase {
public void testForceMerge() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
@ -1466,7 +1463,7 @@ public class InternalEngineTests extends ESTestCase {
public void testEnableGcDeletes() throws Exception {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy()), false)) {
engine.config().setEnableGcDeletes(false);
// Add document
@ -1605,7 +1602,7 @@ public class InternalEngineTests extends ESTestCase {
IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(),
Settings.builder().put(defaultSettings.getSettings()).put(EngineConfig.INDEX_FORCE_NEW_TRANSLOG, true).build(),
Collections.emptyList());
engine = createEngine(indexSettings, store, primaryTranslogDir, new MergeSchedulerConfig(indexSettings), newMergePolicy());
engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy());
}
public void testTranslogReplayWithFailure() throws IOException {
@ -1939,7 +1936,7 @@ public class InternalEngineTests extends ESTestCase {
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexSettings()
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(),
config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener()
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));

View File

@ -53,7 +53,7 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.store.DirectoryService;
@ -117,7 +117,7 @@ public class ShadowEngineTests extends ESTestCase {
codecName = "default";
}
defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
.put(IndexSettings.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
.put(EngineConfig.INDEX_CODEC_SETTING, codecName)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.build()); // TODO randomize more settings
@ -209,7 +209,7 @@ public class ShadowEngineTests extends ESTestCase {
}
protected ShadowEngine createShadowEngine(IndexSettings indexSettings, Store store) {
return new ShadowEngine(config(indexSettings, store, null, new MergeSchedulerConfig(indexSettings), null));
return new ShadowEngine(config(indexSettings, store, null, null));
}
protected InternalEngine createInternalEngine(IndexSettings indexSettings, Store store, Path translogPath) {
@ -217,14 +217,14 @@ public class ShadowEngineTests extends ESTestCase {
}
protected InternalEngine createInternalEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
return new InternalEngine(config(indexSettings, store, translogPath, new MergeSchedulerConfig(indexSettings), mergePolicy), true);
return new InternalEngine(config(indexSettings, store, translogPath, mergePolicy), true);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
EngineConfig config = new EngineConfig(shardId, threadPool, indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
, null, store, createSnapshotDeletionPolicy(), mergePolicy,
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(null, logger), new Engine.EventListener() {
@Override
public void onFailedEngine(String reason, @Nullable Throwable t) {

View File

@ -82,6 +82,7 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.search.stats.SearchSlowLog;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
@ -129,23 +130,23 @@ public class IndexShardTests extends ESSingleNodeTestCase {
public void testFlushOnDeleteSetting() throws Exception {
boolean initValue = randomBoolean();
createIndex("test", settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, initValue).build());
createIndex("test", settingsBuilder().put(IndexSettings.INDEX_FLUSH_ON_CLOSE, initValue).build());
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
IndexShard shard = test.getShardOrNull(0);
assertEquals(initValue, shard.isFlushOnClose());
assertEquals(initValue, shard.getIndexSettings().isFlushOnClose());
final boolean newValue = !initValue;
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, newValue).build()));
assertEquals(newValue, shard.isFlushOnClose());
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_FLUSH_ON_CLOSE, newValue).build()));
assertEquals(newValue, shard.getIndexSettings().isFlushOnClose());
try {
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, "FOOBAR").build()));
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_FLUSH_ON_CLOSE, "FOOBAR").build()));
fail("exception expected");
} catch (IllegalArgumentException ex) {
}
assertEquals(newValue, shard.isFlushOnClose());
assertEquals(newValue, shard.getIndexSettings().isFlushOnClose());
}
@ -719,7 +720,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexService test = indicesService.indexService("test");
IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(133 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(133 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
assertFalse(shard.shouldFlush());
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
@ -735,7 +736,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
shard.getEngine().getTranslog().sync();
long size = shard.getEngine().getTranslog().sizeInBytes();
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(size, ByteSizeUnit.BYTES))
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(size, ByteSizeUnit.BYTES))
.build()).get();
client().prepareDelete("test", "test", "2").get();
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
@ -753,7 +754,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.getShardOrNull(0);
assertFalse(shard.shouldFlush());
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(133/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(133/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
assertFalse(shard.shouldFlush());
final AtomicBoolean running = new AtomicBoolean(true);
@ -1064,7 +1065,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true);
NodeServicesProvider indexServices = indexService.getIndexServices();
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, listeners);
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, indexService.getSearchSlowLog(), listeners);
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);

View File

@ -49,13 +49,13 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.gateway.PrimaryShardAllocator;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.plugins.Plugin;
@ -142,7 +142,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -247,7 +247,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -473,7 +473,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
@ -528,7 +528,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();

View File

@ -33,7 +33,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -79,7 +79,7 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", "-1")
.put(MockEngineSupport.FLUSH_ON_CLOSE_RATIO, 0.0d) // never flush - always recover from translog
.put(IndexShard.INDEX_FLUSH_ON_CLOSE, false) // never flush - always recover from translog
.put(IndexSettings.INDEX_FLUSH_ON_CLOSE, false) // never flush - always recover from translog
));
ensureYellow();
@ -170,13 +170,13 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
/** Disables translog flushing for the specified index */
private static void disableTranslogFlush(String index) {
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).build();
Settings settings = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
/** Enables translog flushing for the specified index */
private static void enableTranslogFlush(String index) {
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB)).build();
Settings settings = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB)).build();
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
}
}

View File

@ -30,8 +30,8 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
@ -153,7 +153,7 @@ public class FlushIT extends ESIntegTestCase {
createIndex("test");
client().admin().indices().prepareUpdateSettings("test").setSettings(
Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).put("index.refresh_interval", -1).put("index.number_of_replicas", internalCluster().numDataNodes() - 1))
Settings.builder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).put("index.refresh_interval", -1).put("index.number_of_replicas", internalCluster().numDataNodes() - 1))
.get();
ensureGreen();
final AtomicBoolean stop = new AtomicBoolean(false);

View File

@ -31,8 +31,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.test.ESIntegTestCase;

View File

@ -44,8 +44,8 @@ import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;

View File

@ -31,7 +31,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;

View File

@ -36,7 +36,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;

View File

@ -101,9 +101,8 @@ import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappedFieldType.Loading;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
@ -511,10 +510,10 @@ public abstract class ESIntegTestCase extends ESTestCase {
private static Settings.Builder setRandomIndexTranslogSettings(Random random, Settings.Builder builder) {
if (random.nextBoolean()) {
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 300), ByteSizeUnit.MB));
builder.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 300), ByteSizeUnit.MB));
}
if (random.nextBoolean()) {
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
builder.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
}
if (random.nextBoolean()) {
builder.put(IndexSettings.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durability.values()));