Merge pull request #15955 from s1monw/move_settings_to_index_level
Move all dynamic settings and their config classes to the index level
This commit is contained in:
commit
b333133183
|
@ -66,13 +66,11 @@ import org.elasticsearch.common.util.ExtensionPoint;
|
|||
import org.elasticsearch.gateway.GatewayAllocator;
|
||||
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.EngineConfig;
|
||||
import org.elasticsearch.index.IndexingSlowLog;
|
||||
import org.elasticsearch.index.search.stats.SearchSlowLog;
|
||||
import org.elasticsearch.index.settings.IndexDynamicSettings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
|
@ -150,8 +148,8 @@ public class ClusterModule extends AbstractModule {
|
|||
registerIndexDynamicSetting(IndicesTTLService.INDEX_TTL_DISABLE_PURGE, Validator.EMPTY);
|
||||
registerIndexDynamicSetting(IndexSettings.INDEX_REFRESH_INTERVAL, Validator.TIME);
|
||||
registerIndexDynamicSetting(PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS, Validator.EMPTY);
|
||||
registerIndexDynamicSetting(EngineConfig.INDEX_GC_DELETES_SETTING, Validator.TIME);
|
||||
registerIndexDynamicSetting(IndexShard.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
|
||||
registerIndexDynamicSetting(IndexSettings.INDEX_GC_DELETES_SETTING, Validator.TIME);
|
||||
registerIndexDynamicSetting(IndexSettings.INDEX_FLUSH_ON_CLOSE, Validator.BOOLEAN);
|
||||
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_WARN, Validator.TIME);
|
||||
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_INFO, Validator.TIME);
|
||||
registerIndexDynamicSetting(IndexingSlowLog.INDEX_INDEXING_SLOWLOG_THRESHOLD_INDEX_DEBUG, Validator.TIME);
|
||||
|
@ -178,7 +176,7 @@ public class ClusterModule extends AbstractModule {
|
|||
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, Validator.DOUBLE_GTE_2);
|
||||
registerIndexDynamicSetting(MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT, Validator.NON_NEGATIVE_DOUBLE);
|
||||
registerIndexDynamicSetting(MergePolicyConfig.INDEX_COMPOUND_FORMAT, Validator.EMPTY);
|
||||
registerIndexDynamicSetting(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
|
||||
registerIndexDynamicSetting(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, Validator.BYTES_SIZE);
|
||||
registerIndexDynamicSetting(IndexSettings.INDEX_TRANSLOG_DURABILITY, Validator.EMPTY);
|
||||
registerIndexDynamicSetting(IndicesWarmer.INDEX_WARMER_ENABLED, Validator.EMPTY);
|
||||
registerIndexDynamicSetting(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED, Validator.BOOLEAN);
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
|||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.query.ParsedQuery;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.index.search.stats.SearchSlowLog;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
@ -108,6 +109,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
private final IndexingOperationListener[] listeners;
|
||||
private volatile AsyncRefreshTask refreshTask;
|
||||
private final AsyncTranslogFSync fsyncTask;
|
||||
private final SearchSlowLog searchSlowLog;
|
||||
|
||||
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
|
||||
SimilarityService similarityService,
|
||||
|
@ -151,6 +153,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
this.fsyncTask = null;
|
||||
}
|
||||
this.refreshTask = new AsyncRefreshTask(this);
|
||||
searchSlowLog = new SearchSlowLog(indexSettings.getSettings());
|
||||
}
|
||||
|
||||
public int numberOfShards() {
|
||||
|
@ -313,9 +316,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
|
||||
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> nodeServicesProvider.getIndicesQueryCache().onClose(shardId)));
|
||||
if (useShadowEngine(primary, indexSettings)) {
|
||||
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider); // no indexing listeners - shadow engines don't index
|
||||
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog); // no indexing listeners - shadow engines don't index
|
||||
} else {
|
||||
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, listeners);
|
||||
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, listeners);
|
||||
}
|
||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||
eventListener.afterIndexShardCreated(indexShard);
|
||||
|
@ -414,6 +417,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
return nodeServicesProvider.getThreadPool();
|
||||
}
|
||||
|
||||
public SearchSlowLog getSearchSlowLog() {
|
||||
return searchSlowLog;
|
||||
}
|
||||
|
||||
private class StoreCloseListener implements Store.OnClose {
|
||||
private final ShardId shardId;
|
||||
private final boolean ownsShard;
|
||||
|
@ -562,9 +569,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
final Settings settings = indexSettings.getSettings();
|
||||
for (final IndexShard shard : this.shards.values()) {
|
||||
try {
|
||||
shard.onRefreshSettings(settings);
|
||||
shard.onSettingsChanged();
|
||||
} catch (Exception e) {
|
||||
logger.warn("[{}] failed to refresh shard settings", e, shard.shardId().id());
|
||||
logger.warn("[{}] failed to notify shard about setting change", e, shard.shardId().id());
|
||||
}
|
||||
}
|
||||
try {
|
||||
|
@ -577,6 +584,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
} catch (Exception e) {
|
||||
logger.warn("failed to refresh slowlog settings", e);
|
||||
}
|
||||
|
||||
try {
|
||||
searchSlowLog.onRefreshSettings(settings); // this will be refactored soon anyway so duplication is ok here
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to refresh slowlog settings", e);
|
||||
}
|
||||
if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
|
||||
rescheduleRefreshTasks();
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.apache.lucene.index.MergePolicy;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
|
@ -25,11 +26,11 @@ import org.elasticsearch.common.logging.ESLogger;
|
|||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -59,6 +60,19 @@ public final class IndexSettings {
|
|||
public static final String INDEX_TRANSLOG_DURABILITY = "index.translog.durability";
|
||||
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
|
||||
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
|
||||
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
|
||||
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
|
||||
|
||||
/**
|
||||
* Index setting to control if a flush is executed before engine is closed
|
||||
* This setting is realtime updateable.
|
||||
*/
|
||||
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
|
||||
/**
|
||||
* Index setting to enable / disable deletes garbage collection.
|
||||
* This setting is realtime updateable
|
||||
*/
|
||||
public static final String INDEX_GC_DELETES_SETTING = "index.gc_deletes";
|
||||
|
||||
private final String uuid;
|
||||
private final List<Consumer<Settings>> updateListeners;
|
||||
|
@ -82,7 +96,12 @@ public final class IndexSettings {
|
|||
private volatile Translog.Durability durability;
|
||||
private final TimeValue syncInterval;
|
||||
private volatile TimeValue refreshInterval;
|
||||
private volatile ByteSizeValue flushThresholdSize;
|
||||
private volatile boolean flushOnClose = true;
|
||||
private final MergeSchedulerConfig mergeSchedulerConfig;
|
||||
private final MergePolicyConfig mergePolicyConfig;
|
||||
|
||||
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
|
||||
|
||||
|
||||
/**
|
||||
|
@ -165,6 +184,11 @@ public final class IndexSettings {
|
|||
this.durability = getFromSettings(settings, Translog.Durability.REQUEST);
|
||||
syncInterval = settings.getAsTime(INDEX_TRANSLOG_SYNC_INTERVAL, TimeValue.timeValueSeconds(5));
|
||||
refreshInterval = settings.getAsTime(INDEX_REFRESH_INTERVAL, DEFAULT_REFRESH_INTERVAL);
|
||||
flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
|
||||
flushOnClose = settings.getAsBoolean(IndexSettings.INDEX_FLUSH_ON_CLOSE, true);
|
||||
mergeSchedulerConfig = new MergeSchedulerConfig(settings);
|
||||
gcDeletesInMillis = settings.getAsTime(IndexSettings.INDEX_GC_DELETES_SETTING, DEFAULT_GC_DELETES).getMillis();
|
||||
this.mergePolicyConfig = new MergePolicyConfig(logger, settings);
|
||||
assert indexNameMatcher.test(indexMetaData.getIndex());
|
||||
}
|
||||
|
||||
|
@ -360,13 +384,88 @@ public final class IndexSettings {
|
|||
logger.info("updating refresh_interval from [{}] to [{}]", this.refreshInterval, refreshInterval);
|
||||
this.refreshInterval = refreshInterval;
|
||||
}
|
||||
|
||||
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, this.flushThresholdSize);
|
||||
if (!flushThresholdSize.equals(this.flushThresholdSize)) {
|
||||
logger.info("updating flush_threshold_size from [{}] to [{}]", this.flushThresholdSize, flushThresholdSize);
|
||||
this.flushThresholdSize = flushThresholdSize;
|
||||
}
|
||||
|
||||
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, this.flushOnClose);
|
||||
if (flushOnClose != this.flushOnClose) {
|
||||
logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, this.flushOnClose, flushOnClose);
|
||||
this.flushOnClose = flushOnClose;
|
||||
}
|
||||
|
||||
final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
|
||||
if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
|
||||
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount);
|
||||
mergeSchedulerConfig.setMaxThreadCount(maxThreadCount);
|
||||
}
|
||||
|
||||
final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount());
|
||||
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
|
||||
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount);
|
||||
mergeSchedulerConfig.setMaxMergeCount(maxMergeCount);
|
||||
}
|
||||
|
||||
final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle());
|
||||
if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) {
|
||||
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle);
|
||||
mergeSchedulerConfig.setAutoThrottle(autoThrottle);
|
||||
}
|
||||
|
||||
long gcDeletesInMillis = settings.getAsTime(IndexSettings.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(this.gcDeletesInMillis)).getMillis();
|
||||
if (gcDeletesInMillis != this.gcDeletesInMillis) {
|
||||
logger.info("updating {} from [{}] to [{}]", IndexSettings.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(this.gcDeletesInMillis), TimeValue.timeValueMillis(gcDeletesInMillis));
|
||||
this.gcDeletesInMillis = gcDeletesInMillis;
|
||||
}
|
||||
|
||||
mergePolicyConfig.onRefreshSettings(settings);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the translog sync interval. This is the interval in which the transaction log is asynchronously fsynced unless
|
||||
* the transaction log is fsyncing on every operations
|
||||
*/
|
||||
public TimeValue getTranslogSyncInterval() {
|
||||
return syncInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns this interval in which the shards of this index are asynchronously refreshed. <tt>-1</tt> means async refresh is disabled.
|
||||
*/
|
||||
public TimeValue getRefreshInterval() {
|
||||
return refreshInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the transaction log threshold size when to forcefully flush the index and clear the transaction log.
|
||||
*/
|
||||
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff this index should be flushed on close. Default is <code>true</code>
|
||||
*/
|
||||
public boolean isFlushOnClose() { return flushOnClose; }
|
||||
|
||||
/**
|
||||
* Returns the {@link MergeSchedulerConfig}
|
||||
*/
|
||||
public MergeSchedulerConfig getMergeSchedulerConfig() { return mergeSchedulerConfig; }
|
||||
|
||||
/**
|
||||
* Returns the GC deletes cycle in milliseconds.
|
||||
*/
|
||||
public long getGcDeletesInMillis() {
|
||||
return gcDeletesInMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the merge policy that should be used for this index.
|
||||
*/
|
||||
public MergePolicy getMergePolicy() {
|
||||
return mergePolicyConfig.getMergePolicy();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.shard;
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.apache.lucene.index.MergePolicy;
|
||||
import org.apache.lucene.index.NoMergePolicy;
|
||||
|
@ -33,61 +33,61 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
* where the index data is stored, and are immutable up to delete markers.
|
||||
* Segments are, periodically, merged into larger segments to keep the
|
||||
* index size at bay and expunge deletes.
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* Merges select segments of approximately equal size, subject to an allowed
|
||||
* number of segments per tier. The merge policy is able to merge
|
||||
* non-adjacent segments, and separates how many segments are merged at once from how many
|
||||
* segments are allowed per tier. It also does not over-merge (i.e., cascade merges).
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* All merge policy settings are <b>dynamic</b> and can be updated on a live index.
|
||||
* The merge policy has the following settings:
|
||||
*
|
||||
*
|
||||
* <ul>
|
||||
* <li><code>index.merge.policy.expunge_deletes_allowed</code>:
|
||||
*
|
||||
*
|
||||
* When expungeDeletes is called, we only merge away a segment if its delete
|
||||
* percentage is over this threshold. Default is <code>10</code>.
|
||||
*
|
||||
*
|
||||
* <li><code>index.merge.policy.floor_segment</code>:
|
||||
*
|
||||
*
|
||||
* Segments smaller than this are "rounded up" to this size, i.e. treated as
|
||||
* equal (floor) size for merge selection. This is to prevent frequent
|
||||
* flushing of tiny segments, thus preventing a long tail in the index. Default
|
||||
* is <code>2mb</code>.
|
||||
*
|
||||
*
|
||||
* <li><code>index.merge.policy.max_merge_at_once</code>:
|
||||
*
|
||||
*
|
||||
* Maximum number of segments to be merged at a time during "normal" merging.
|
||||
* Default is <code>10</code>.
|
||||
*
|
||||
*
|
||||
* <li><code>index.merge.policy.max_merge_at_once_explicit</code>:
|
||||
*
|
||||
*
|
||||
* Maximum number of segments to be merged at a time, during force merge or
|
||||
* expungeDeletes. Default is <code>30</code>.
|
||||
*
|
||||
*
|
||||
* <li><code>index.merge.policy.max_merged_segment</code>:
|
||||
*
|
||||
*
|
||||
* Maximum sized segment to produce during normal merging (not explicit
|
||||
* force merge). This setting is approximate: the estimate of the merged
|
||||
* segment size is made by summing sizes of to-be-merged segments
|
||||
* (compensating for percent deleted docs). Default is <code>5gb</code>.
|
||||
*
|
||||
*
|
||||
* <li><code>index.merge.policy.segments_per_tier</code>:
|
||||
*
|
||||
*
|
||||
* Sets the allowed number of segments per tier. Smaller values mean more
|
||||
* merging but fewer segments. Default is <code>10</code>. Note, this value needs to be
|
||||
* >= than the <code>max_merge_at_once</code> otherwise you'll force too many merges to
|
||||
* occur.
|
||||
*
|
||||
*
|
||||
* <li><code>index.merge.policy.reclaim_deletes_weight</code>:
|
||||
*
|
||||
*
|
||||
* Controls how aggressively merges that reclaim more deletions are favored.
|
||||
* Higher values favor selecting merges that reclaim deletions. A value of
|
||||
* <code>0.0</code> means deletions don't impact merge selection. Defaults to <code>2.0</code>.
|
||||
* </ul>
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* For normal merging, the policy first computes a "budget" of how many
|
||||
* segments are allowed to be in the index. If the index is over-budget,
|
||||
|
@ -97,13 +97,13 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
* smallest seg), total merge size and pct deletes reclaimed, so that
|
||||
* merges with lower skew, smaller size and those reclaiming more deletes,
|
||||
* are favored.
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* If a merge will produce a segment that's larger than
|
||||
* <code>max_merged_segment</code> then the policy will merge fewer segments (down to
|
||||
* 1 at once, if that one has deletions) to keep the segment size under
|
||||
* budget.
|
||||
*
|
||||
*
|
||||
* <p>
|
||||
* Note, this can mean that for large shards that holds many gigabytes of
|
||||
* data, the default of <code>max_merged_segment</code> (<code>5gb</code>) can cause for many
|
||||
|
@ -138,7 +138,7 @@ public final class MergePolicyConfig {
|
|||
public static final String INDEX_MERGE_ENABLED = "index.merge.enabled";
|
||||
|
||||
|
||||
public MergePolicyConfig(ESLogger logger, Settings indexSettings) {
|
||||
MergePolicyConfig(ESLogger logger, Settings indexSettings) {
|
||||
this.logger = logger;
|
||||
this.noCFSRatio = parseNoCFSRatio(indexSettings.get(INDEX_COMPOUND_FORMAT, Double.toString(TieredMergePolicy.DEFAULT_NO_CFS_RATIO)));
|
||||
double forceMergeDeletesPctAllowed = indexSettings.getAsDouble("index.merge.policy.expunge_deletes_allowed", DEFAULT_EXPUNGE_DELETES_ALLOWED); // percentage
|
||||
|
@ -180,11 +180,11 @@ public final class MergePolicyConfig {
|
|||
return maxMergeAtOnce;
|
||||
}
|
||||
|
||||
public MergePolicy getMergePolicy() {
|
||||
MergePolicy getMergePolicy() {
|
||||
return mergesEnabled ? mergePolicy : NoMergePolicy.INSTANCE;
|
||||
}
|
||||
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
void onRefreshSettings(Settings settings) {
|
||||
final double oldExpungeDeletesPctAllowed = mergePolicy.getForceMergeDeletesPctAllowed();
|
||||
final double expungeDeletesPctAllowed = settings.getAsDouble(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED, oldExpungeDeletesPctAllowed);
|
||||
if (expungeDeletesPctAllowed != oldExpungeDeletesPctAllowed) {
|
||||
|
@ -243,7 +243,7 @@ public final class MergePolicyConfig {
|
|||
}
|
||||
}
|
||||
|
||||
public static double parseNoCFSRatio(String noCFSRatio) {
|
||||
private static double parseNoCFSRatio(String noCFSRatio) {
|
||||
noCFSRatio = noCFSRatio.trim();
|
||||
if (noCFSRatio.equalsIgnoreCase("true")) {
|
||||
return 1.0d;
|
||||
|
@ -262,7 +262,7 @@ public final class MergePolicyConfig {
|
|||
}
|
||||
}
|
||||
|
||||
public static String formatNoCFSRatio(double ratio) {
|
||||
private static String formatNoCFSRatio(double ratio) {
|
||||
if (ratio == 1.0) {
|
||||
return Boolean.TRUE.toString();
|
||||
} else if (ratio == 0.0) {
|
|
@ -17,7 +17,7 @@
|
|||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.shard;
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.apache.lucene.index.ConcurrentMergeScheduler;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -60,8 +60,7 @@ public final class MergeSchedulerConfig {
|
|||
private volatile int maxThreadCount;
|
||||
private volatile int maxMergeCount;
|
||||
|
||||
public MergeSchedulerConfig(IndexSettings indexSettings) {
|
||||
final Settings settings = indexSettings.getSettings();
|
||||
MergeSchedulerConfig(Settings settings) {
|
||||
maxThreadCount = settings.getAsInt(MAX_THREAD_COUNT, Math.max(1, Math.min(4, EsExecutors.boundedNumberOfProcessors(settings) / 2)));
|
||||
maxMergeCount = settings.getAsInt(MAX_MERGE_COUNT, maxThreadCount + 5);
|
||||
this.autoThrottle = settings.getAsBoolean(AUTO_THROTTLE, true);
|
||||
|
@ -78,7 +77,7 @@ public final class MergeSchedulerConfig {
|
|||
/**
|
||||
* Enables / disables auto throttling on the {@link ConcurrentMergeScheduler}
|
||||
*/
|
||||
public void setAutoThrottle(boolean autoThrottle) {
|
||||
void setAutoThrottle(boolean autoThrottle) {
|
||||
this.autoThrottle = autoThrottle;
|
||||
}
|
||||
|
||||
|
@ -93,7 +92,7 @@ public final class MergeSchedulerConfig {
|
|||
* Expert: directly set the maximum number of merge threads and
|
||||
* simultaneous merges allowed.
|
||||
*/
|
||||
public void setMaxThreadCount(int maxThreadCount) {
|
||||
void setMaxThreadCount(int maxThreadCount) {
|
||||
this.maxThreadCount = maxThreadCount;
|
||||
}
|
||||
|
||||
|
@ -108,7 +107,7 @@ public final class MergeSchedulerConfig {
|
|||
*
|
||||
* Expert: set the maximum number of simultaneous merges allowed.
|
||||
*/
|
||||
public void setMaxMergeCount(int maxMergeCount) {
|
||||
void setMaxMergeCount(int maxMergeCount) {
|
||||
this.maxMergeCount = maxMergeCount;
|
||||
}
|
||||
}
|
|
@ -36,7 +36,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.merge.OnGoingMerge;
|
||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -67,8 +67,8 @@ class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
|||
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
|
||||
private final MergeSchedulerConfig config;
|
||||
|
||||
public ElasticsearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings, MergeSchedulerConfig config) {
|
||||
this.config = config;
|
||||
public ElasticsearchConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
|
||||
this.config = indexSettings.getMergeSchedulerConfig();
|
||||
this.shardId = shardId;
|
||||
this.indexSettings = indexSettings.getSettings();
|
||||
this.logger = Loggers.getLogger(getClass(), this.indexSettings, shardId);
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
|
@ -39,8 +38,6 @@ import org.elasticsearch.index.translog.TranslogConfig;
|
|||
import org.elasticsearch.indices.IndexingMemoryController;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/*
|
||||
* Holds all the configuration that is used to create an {@link Engine}.
|
||||
* Once {@link Engine} has been created with this object, changes to this
|
||||
|
@ -51,7 +48,6 @@ public final class EngineConfig {
|
|||
private final TranslogRecoveryPerformer translogRecoveryPerformer;
|
||||
private final IndexSettings indexSettings;
|
||||
private final ByteSizeValue indexingBufferSize;
|
||||
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
|
||||
private volatile boolean enableGcDeletes = true;
|
||||
private final TimeValue flushMergesAfter;
|
||||
private final String codecName;
|
||||
|
@ -60,7 +56,6 @@ public final class EngineConfig {
|
|||
private final Store store;
|
||||
private final SnapshotDeletionPolicy deletionPolicy;
|
||||
private final MergePolicy mergePolicy;
|
||||
private final MergeSchedulerConfig mergeSchedulerConfig;
|
||||
private final Analyzer analyzer;
|
||||
private final Similarity similarity;
|
||||
private final CodecService codecService;
|
||||
|
@ -69,12 +64,6 @@ public final class EngineConfig {
|
|||
private final QueryCache queryCache;
|
||||
private final QueryCachingPolicy queryCachingPolicy;
|
||||
|
||||
/**
|
||||
* Index setting to enable / disable deletes garbage collection.
|
||||
* This setting is realtime updateable
|
||||
*/
|
||||
public static final String INDEX_GC_DELETES_SETTING = "index.gc_deletes";
|
||||
|
||||
/**
|
||||
* Index setting to change the low level lucene codec used for writing new segments.
|
||||
* This setting is <b>not</b> realtime updateable.
|
||||
|
@ -84,8 +73,6 @@ public final class EngineConfig {
|
|||
/** if set to true the engine will start even if the translog id in the commit point can not be found */
|
||||
public static final String INDEX_FORCE_NEW_TRANSLOG = "index.engine.force_new_translog";
|
||||
|
||||
public static final TimeValue DEFAULT_GC_DELETES = TimeValue.timeValueSeconds(60);
|
||||
|
||||
private static final String DEFAULT_CODEC_NAME = "default";
|
||||
private TranslogConfig translogConfig;
|
||||
private boolean create = false;
|
||||
|
@ -95,7 +82,7 @@ public final class EngineConfig {
|
|||
*/
|
||||
public EngineConfig(ShardId shardId, ThreadPool threadPool,
|
||||
IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
|
||||
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
|
||||
MergePolicy mergePolicy,Analyzer analyzer,
|
||||
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
|
||||
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter) {
|
||||
this.shardId = shardId;
|
||||
|
@ -106,7 +93,6 @@ public final class EngineConfig {
|
|||
this.store = store;
|
||||
this.deletionPolicy = deletionPolicy;
|
||||
this.mergePolicy = mergePolicy;
|
||||
this.mergeSchedulerConfig = mergeSchedulerConfig;
|
||||
this.analyzer = analyzer;
|
||||
this.similarity = similarity;
|
||||
this.codecService = codecService;
|
||||
|
@ -116,7 +102,6 @@ public final class EngineConfig {
|
|||
// there are not too many shards allocated to this node. Instead, IndexingMemoryController periodically checks
|
||||
// and refreshes the most heap-consuming shards when total indexing heap usage across all shards is too high:
|
||||
indexingBufferSize = new ByteSizeValue(256, ByteSizeUnit.MB);
|
||||
gcDeletesInMillis = settings.getAsTime(INDEX_GC_DELETES_SETTING, EngineConfig.DEFAULT_GC_DELETES).millis();
|
||||
this.translogRecoveryPerformer = translogRecoveryPerformer;
|
||||
this.forceNewTranslog = settings.getAsBoolean(INDEX_FORCE_NEW_TRANSLOG, false);
|
||||
this.queryCache = queryCache;
|
||||
|
@ -146,19 +131,12 @@ public final class EngineConfig {
|
|||
return indexingBufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the GC deletes cycle in milliseconds.
|
||||
*/
|
||||
public long getGcDeletesInMillis() {
|
||||
return gcDeletesInMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff delete garbage collection in the engine should be enabled. This setting is updateable
|
||||
* in realtime and forces a volatile read. Consumers can safely read this value directly go fetch it's latest value. The default is <code>true</code>
|
||||
* <p>
|
||||
* Engine GC deletion if enabled collects deleted documents from in-memory realtime data structures after a certain amount of
|
||||
* time ({@link #getGcDeletesInMillis()} if enabled. Before deletes are GCed they will cause re-adding the document that was deleted
|
||||
* time ({@link IndexSettings#getGcDeletesInMillis()} if enabled. Before deletes are GCed they will cause re-adding the document that was deleted
|
||||
* to fail.
|
||||
* </p>
|
||||
*/
|
||||
|
@ -218,13 +196,6 @@ public final class EngineConfig {
|
|||
return mergePolicy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link MergeSchedulerConfig}
|
||||
*/
|
||||
public MergeSchedulerConfig getMergeSchedulerConfig() {
|
||||
return mergeSchedulerConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a listener that should be called on engine failure
|
||||
*/
|
||||
|
@ -258,13 +229,6 @@ public final class EngineConfig {
|
|||
return similarity;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the GC deletes cycle in milliseconds.
|
||||
*/
|
||||
public void setGcDeletesInMillis(long gcDeletesInMillis) {
|
||||
this.gcDeletesInMillis = gcDeletesInMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the {@link org.elasticsearch.index.shard.TranslogRecoveryPerformer} for this engine. This class is used
|
||||
* to apply transaction log operations to the engine. It encapsulates all the logic to transfer the translog entry into
|
||||
|
|
|
@ -57,14 +57,12 @@ import org.elasticsearch.common.lucene.uid.Versions;
|
|||
import org.elasticsearch.common.math.MathUtils;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.merge.OnGoingMerge;
|
||||
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
|
||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
@ -136,7 +134,7 @@ public class InternalEngine extends Engine {
|
|||
try {
|
||||
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
|
||||
this.warmer = engineConfig.getWarmer();
|
||||
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings(), engineConfig.getMergeSchedulerConfig());
|
||||
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
|
||||
this.dirtyLocks = new Object[Runtime.getRuntime().availableProcessors() * 10]; // we multiply it to have enough...
|
||||
for (int i = 0; i < dirtyLocks.length; i++) {
|
||||
dirtyLocks[i] = new Object();
|
||||
|
@ -370,7 +368,7 @@ public class InternalEngine extends Engine {
|
|||
deleted = currentVersion == Versions.NOT_FOUND;
|
||||
} else {
|
||||
deleted = versionValue.delete();
|
||||
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
|
||||
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
|
||||
currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
||||
} else {
|
||||
currentVersion = versionValue.version();
|
||||
|
@ -436,7 +434,7 @@ public class InternalEngine extends Engine {
|
|||
private void maybePruneDeletedTombstones() {
|
||||
// It's expensive to prune because we walk the deletes map acquiring dirtyLock for each uid so we only do it
|
||||
// every 1/4 of gcDeletesInMillis:
|
||||
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > engineConfig.getGcDeletesInMillis() * 0.25) {
|
||||
if (engineConfig.isEnableGcDeletes() && engineConfig.getThreadPool().estimatedTimeInMillis() - lastDeleteVersionPruneTimeMSec > getGcDeletesInMillis() * 0.25) {
|
||||
pruneDeletedTombstones();
|
||||
}
|
||||
}
|
||||
|
@ -452,7 +450,7 @@ public class InternalEngine extends Engine {
|
|||
deleted = currentVersion == Versions.NOT_FOUND;
|
||||
} else {
|
||||
deleted = versionValue.delete();
|
||||
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) {
|
||||
if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) {
|
||||
currentVersion = Versions.NOT_FOUND; // deleted, and GC
|
||||
} else {
|
||||
currentVersion = versionValue.version();
|
||||
|
@ -701,7 +699,7 @@ public class InternalEngine extends Engine {
|
|||
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
|
||||
VersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
|
||||
if (versionValue != null) {
|
||||
if (timeMSec - versionValue.time() > engineConfig.getGcDeletesInMillis()) {
|
||||
if (timeMSec - versionValue.time() > getGcDeletesInMillis()) {
|
||||
versionMap.removeTombstoneUnderLock(uid);
|
||||
}
|
||||
}
|
||||
|
@ -1072,7 +1070,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
long getGcDeletesInMillis() {
|
||||
return engineConfig.getGcDeletesInMillis();
|
||||
return engineConfig.getIndexSettings().getGcDeletesInMillis();
|
||||
}
|
||||
|
||||
LiveIndexWriterConfig getCurrentIndexWriterConfig() {
|
||||
|
@ -1083,8 +1081,8 @@ public class InternalEngine extends Engine {
|
|||
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
|
||||
private final AtomicBoolean isThrottling = new AtomicBoolean();
|
||||
|
||||
EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings, MergeSchedulerConfig config) {
|
||||
super(shardId, indexSettings, config);
|
||||
EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) {
|
||||
super(shardId, indexSettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit;
|
|||
|
||||
/**
|
||||
*/
|
||||
public final class SearchSlowLog{
|
||||
public final class SearchSlowLog {
|
||||
|
||||
private boolean reformat;
|
||||
|
||||
|
@ -62,7 +62,7 @@ public final class SearchSlowLog{
|
|||
public static final String INDEX_SEARCH_SLOWLOG_REFORMAT = INDEX_SEARCH_SLOWLOG_PREFIX + ".reformat";
|
||||
public static final String INDEX_SEARCH_SLOWLOG_LEVEL = INDEX_SEARCH_SLOWLOG_PREFIX + ".level";
|
||||
|
||||
SearchSlowLog(Settings indexSettings) {
|
||||
public SearchSlowLog(Settings indexSettings) {
|
||||
|
||||
this.reformat = indexSettings.getAsBoolean(INDEX_SEARCH_SLOWLOG_REFORMAT, true);
|
||||
|
||||
|
@ -109,7 +109,7 @@ public final class SearchSlowLog{
|
|||
}
|
||||
}
|
||||
|
||||
synchronized void onRefreshSettings(Settings settings) {
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
long queryWarnThreshold = settings.getAsTime(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN, TimeValue.timeValueNanos(this.queryWarnThreshold)).nanos();
|
||||
if (queryWarnThreshold != this.queryWarnThreshold) {
|
||||
this.queryWarnThreshold = queryWarnThreshold;
|
||||
|
|
|
@ -41,8 +41,8 @@ public final class ShardSearchStats {
|
|||
private final CounterMetric openContexts = new CounterMetric();
|
||||
private volatile Map<String, StatsHolder> groupsStats = emptyMap();
|
||||
|
||||
public ShardSearchStats(Settings indexSettings) {
|
||||
this.slowLogSearchService = new SearchSlowLog(indexSettings);
|
||||
public ShardSearchStats(SearchSlowLog searchSlowLog) {
|
||||
this.slowLogSearchService = searchSlowLog;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.elasticsearch.common.logging.ESLogger;
|
|||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.Callback;
|
||||
|
@ -92,6 +91,7 @@ import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
|
|||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.index.recovery.RecoveryStats;
|
||||
import org.elasticsearch.index.refresh.RefreshStats;
|
||||
import org.elasticsearch.index.search.stats.SearchSlowLog;
|
||||
import org.elasticsearch.index.search.stats.SearchStats;
|
||||
import org.elasticsearch.index.search.stats.ShardSearchStats;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
|
@ -141,7 +141,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
private final MapperService mapperService;
|
||||
private final IndexCache indexCache;
|
||||
private final Store store;
|
||||
private final MergeSchedulerConfig mergeSchedulerConfig;
|
||||
private final InternalIndexingStats internalIndexingStats;
|
||||
private final ShardSearchStats searchService;
|
||||
private final ShardGetService getService;
|
||||
|
@ -161,7 +160,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
private final SimilarityService similarityService;
|
||||
private final EngineConfig engineConfig;
|
||||
private final TranslogConfig translogConfig;
|
||||
private final MergePolicyConfig mergePolicyConfig;
|
||||
private final IndicesQueryCache indicesQueryCache;
|
||||
private final IndexEventListener indexEventListener;
|
||||
private final IndexSettings idxSettings;
|
||||
|
@ -188,15 +186,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
private final MeanMetric flushMetric = new MeanMetric();
|
||||
|
||||
private final ShardEventListener shardEventListener = new ShardEventListener();
|
||||
private volatile boolean flushOnClose = true;
|
||||
private volatile ByteSizeValue flushThresholdSize;
|
||||
|
||||
/**
|
||||
* Index setting to control if a flush is executed before engine is closed
|
||||
* This setting is realtime updateable.
|
||||
*/
|
||||
public static final String INDEX_FLUSH_ON_CLOSE = "index.flush_on_close";
|
||||
public static final String INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
|
||||
|
||||
private final ShardPath path;
|
||||
|
||||
|
@ -215,7 +204,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
|
||||
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
|
||||
@Nullable EngineFactory engineFactory,
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, IndexingOperationListener... listeners) {
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, SearchSlowLog slowLog, IndexingOperationListener... listeners) {
|
||||
super(shardId, indexSettings);
|
||||
final Settings settings = indexSettings.getSettings();
|
||||
this.idxSettings = indexSettings;
|
||||
|
@ -227,7 +216,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
this.engineFactory = engineFactory == null ? new InternalEngineFactory() : engineFactory;
|
||||
this.store = store;
|
||||
this.indexEventListener = indexEventListener;
|
||||
this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings);
|
||||
this.threadPool = provider.getThreadPool();
|
||||
this.mapperService = mapperService;
|
||||
this.indexCache = indexCache;
|
||||
|
@ -237,7 +225,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
|
||||
this.getService = new ShardGetService(indexSettings, this, mapperService);
|
||||
this.termVectorsService = provider.getTermVectorsService();
|
||||
this.searchService = new ShardSearchStats(settings);
|
||||
this.searchService = new ShardSearchStats(slowLog);
|
||||
this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings);
|
||||
this.indicesQueryCache = provider.getIndicesQueryCache();
|
||||
this.shardQueryCache = new ShardRequestCache(shardId, indexSettings);
|
||||
|
@ -245,9 +233,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
this.indexFieldDataService = indexFieldDataService;
|
||||
this.shardBitsetFilterCache = new ShardBitsetFilterCache(shardId, indexSettings);
|
||||
state = IndexShardState.CREATED;
|
||||
this.flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, true);
|
||||
this.path = path;
|
||||
this.mergePolicyConfig = new MergePolicyConfig(logger, settings);
|
||||
/* create engine config */
|
||||
logger.debug("state: [CREATED]");
|
||||
|
||||
|
@ -264,7 +250,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
|
||||
this.flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB));
|
||||
this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId);
|
||||
this.provider = provider;
|
||||
this.searcherWrapper = indexSearcherWrapper;
|
||||
|
@ -817,7 +802,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
} finally {
|
||||
final Engine engine = this.currentEngineReference.getAndSet(null);
|
||||
try {
|
||||
if (engine != null && flushEngine && this.flushOnClose) {
|
||||
if (engine != null && flushEngine && indexSettings.isFlushOnClose()) {
|
||||
engine.flushAndClose();
|
||||
}
|
||||
} finally { // playing safe here and close the engine even if the above succeeds - close can be called multiple times
|
||||
|
@ -1048,10 +1033,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public final boolean isFlushOnClose() {
|
||||
return flushOnClose;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the shards metadata state. This method can only be executed if the shard is not active.
|
||||
*
|
||||
|
@ -1093,7 +1074,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
if (engine != null) {
|
||||
try {
|
||||
Translog translog = engine.getTranslog();
|
||||
return translog.sizeInBytes() > flushThresholdSize.bytes();
|
||||
return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().bytes();
|
||||
} catch (AlreadyClosedException | EngineClosedException ex) {
|
||||
// that's fine we are already close - no need to flush
|
||||
}
|
||||
|
@ -1101,57 +1082,10 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
return false;
|
||||
}
|
||||
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
boolean change = false;
|
||||
synchronized (mutex) {
|
||||
if (state() == IndexShardState.CLOSED) { // no need to update anything if we are closed
|
||||
return;
|
||||
}
|
||||
ByteSizeValue flushThresholdSize = settings.getAsBytesSize(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, this.flushThresholdSize);
|
||||
if (!flushThresholdSize.equals(this.flushThresholdSize)) {
|
||||
logger.info("updating flush_threshold_size from [{}] to [{}]", this.flushThresholdSize, flushThresholdSize);
|
||||
this.flushThresholdSize = flushThresholdSize;
|
||||
}
|
||||
|
||||
final EngineConfig config = engineConfig;
|
||||
final boolean flushOnClose = settings.getAsBoolean(INDEX_FLUSH_ON_CLOSE, this.flushOnClose);
|
||||
if (flushOnClose != this.flushOnClose) {
|
||||
logger.info("updating {} from [{}] to [{}]", INDEX_FLUSH_ON_CLOSE, this.flushOnClose, flushOnClose);
|
||||
this.flushOnClose = flushOnClose;
|
||||
}
|
||||
|
||||
long gcDeletesInMillis = settings.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis())).millis();
|
||||
if (gcDeletesInMillis != config.getGcDeletesInMillis()) {
|
||||
logger.info("updating {} from [{}] to [{}]", EngineConfig.INDEX_GC_DELETES_SETTING, TimeValue.timeValueMillis(config.getGcDeletesInMillis()), TimeValue.timeValueMillis(gcDeletesInMillis));
|
||||
config.setGcDeletesInMillis(gcDeletesInMillis);
|
||||
change = true;
|
||||
}
|
||||
|
||||
final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
|
||||
if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
|
||||
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount);
|
||||
mergeSchedulerConfig.setMaxThreadCount(maxThreadCount);
|
||||
change = true;
|
||||
}
|
||||
|
||||
final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount());
|
||||
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
|
||||
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount);
|
||||
mergeSchedulerConfig.setMaxMergeCount(maxMergeCount);
|
||||
change = true;
|
||||
}
|
||||
|
||||
final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle());
|
||||
if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) {
|
||||
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle);
|
||||
mergeSchedulerConfig.setAutoThrottle(autoThrottle);
|
||||
change = true;
|
||||
}
|
||||
}
|
||||
mergePolicyConfig.onRefreshSettings(settings);
|
||||
searchService.onRefreshSettings(settings);
|
||||
if (change) {
|
||||
getEngine().onSettingsChanged();
|
||||
public void onSettingsChanged() {
|
||||
Engine engineOrNull = getEngineOrNull();
|
||||
if (engineOrNull != null) {
|
||||
engineOrNull.onSettingsChanged();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1431,7 +1365,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
};
|
||||
final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel);
|
||||
return new EngineConfig(shardId,
|
||||
threadPool, indexSettings, engineWarmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
|
||||
threadPool, indexSettings, engineWarmer, store, deletionPolicy, indexSettings.getMergePolicy(),
|
||||
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
|
||||
idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.index.engine.EngineFactory;
|
|||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.merge.MergeStats;
|
||||
import org.elasticsearch.index.search.stats.SearchSlowLog;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.TranslogStats;
|
||||
|
@ -44,8 +45,8 @@ import java.io.IOException;
|
|||
public final class ShadowIndexShard extends IndexShard {
|
||||
|
||||
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory,
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, NodeServicesProvider provider) throws IOException {
|
||||
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, provider);
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, NodeServicesProvider provider, SearchSlowLog searchSlowLog) throws IOException {
|
||||
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, provider, searchSlowLog);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.elasticsearch.action.admin.indices.segments;
|
|||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Segment;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.indices.IndexClosedException;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.junit.Before;
|
||||
|
|
|
@ -50,7 +50,7 @@ import org.elasticsearch.index.IndexSettings;
|
|||
import org.elasticsearch.index.engine.Segment;
|
||||
import org.elasticsearch.index.mapper.string.StringFieldMapperPositionIncrementGapTests;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
|
|
|
@ -181,7 +181,7 @@ public class IndexWithShadowReplicasIT extends ESIntegTestCase {
|
|||
Settings idxSettings = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
||||
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB))
|
||||
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB))
|
||||
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
|
||||
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
|
||||
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.index.shard;
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.apache.lucene.index.NoMergePolicy;
|
||||
import org.apache.lucene.index.TieredMergePolicy;
|
||||
|
@ -24,6 +24,8 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
|
@ -21,52 +21,44 @@ package org.elasticsearch.index.engine;
|
|||
import org.apache.lucene.index.LiveIndexWriterConfig;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.EngineAccess;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
|
||||
|
||||
public void testSettingsUpdate() {
|
||||
final IndexService service = createIndex("foo");
|
||||
// INDEX_COMPOUND_ON_FLUSH
|
||||
InternalEngine engine = ((InternalEngine) EngineAccess.engine(service.getShardOrNull(0)));
|
||||
assertThat(engine.getCurrentIndexWriterConfig().getUseCompoundFile(), is(true));
|
||||
|
||||
|
||||
// VERSION MAP SIZE
|
||||
long indexBufferSize = engine.config().getIndexingBufferSize().bytes();
|
||||
|
||||
final int iters = between(1, 20);
|
||||
for (int i = 0; i < iters; i++) {
|
||||
boolean compoundOnFlush = randomBoolean();
|
||||
|
||||
// Tricky: TimeValue.parseTimeValue casts this long to a double, which steals 11 of the 64 bits for exponent, so we can't use
|
||||
// the full long range here else the assert below fails:
|
||||
long gcDeletes = random().nextLong() & (Long.MAX_VALUE >> 11);
|
||||
|
||||
Settings build = Settings.builder()
|
||||
.put(EngineConfig.INDEX_GC_DELETES_SETTING, gcDeletes, TimeUnit.MILLISECONDS)
|
||||
.put(IndexSettings.INDEX_GC_DELETES_SETTING, gcDeletes, TimeUnit.MILLISECONDS)
|
||||
.build();
|
||||
assertEquals(gcDeletes, build.getAsTime(EngineConfig.INDEX_GC_DELETES_SETTING, null).millis());
|
||||
assertEquals(gcDeletes, build.getAsTime(IndexSettings.INDEX_GC_DELETES_SETTING, null).millis());
|
||||
|
||||
client().admin().indices().prepareUpdateSettings("foo").setSettings(build).get();
|
||||
LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
|
||||
assertEquals(currentIndexWriterConfig.getUseCompoundFile(), true);
|
||||
|
||||
|
||||
assertEquals(engine.config().getGcDeletesInMillis(), gcDeletes);
|
||||
assertEquals(engine.config().getIndexSettings().getGcDeletesInMillis(), gcDeletes);
|
||||
assertEquals(engine.getGcDeletesInMillis(), gcDeletes);
|
||||
|
||||
indexBufferSize = engine.config().getIndexingBufferSize().bytes();
|
||||
}
|
||||
|
||||
Settings settings = Settings.builder()
|
||||
.put(EngineConfig.INDEX_GC_DELETES_SETTING, 1000, TimeUnit.MILLISECONDS)
|
||||
.put(IndexSettings.INDEX_GC_DELETES_SETTING, 1000, TimeUnit.MILLISECONDS)
|
||||
.build();
|
||||
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
|
||||
assertEquals(engine.getGcDeletesInMillis(), 1000);
|
||||
|
@ -74,7 +66,7 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
|
|||
|
||||
|
||||
settings = Settings.builder()
|
||||
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "0ms")
|
||||
.put(IndexSettings.INDEX_GC_DELETES_SETTING, "0ms")
|
||||
.build();
|
||||
|
||||
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
|
||||
|
@ -82,7 +74,7 @@ public class InternalEngineSettingsTests extends ESSingleNodeTestCase {
|
|||
assertTrue(engine.config().isEnableGcDeletes());
|
||||
|
||||
settings = Settings.builder()
|
||||
.put(EngineConfig.INDEX_GC_DELETES_SETTING, 1000, TimeUnit.MILLISECONDS)
|
||||
.put(IndexSettings.INDEX_GC_DELETES_SETTING, 1000, TimeUnit.MILLISECONDS)
|
||||
.build();
|
||||
client().admin().indices().prepareUpdateSettings("foo").setSettings(settings).get();
|
||||
assertEquals(engine.getGcDeletesInMillis(), 1000);
|
||||
|
|
|
@ -61,8 +61,6 @@ import org.elasticsearch.common.logging.ESLogger;
|
|||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
@ -85,7 +83,7 @@ import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
|||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.elasticsearch.index.mapper.object.RootObjectMapper;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardUtils;
|
||||
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
|
||||
|
@ -95,7 +93,6 @@ import org.elasticsearch.index.store.DirectoryUtils;
|
|||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogConfig;
|
||||
import org.elasticsearch.indices.IndexingMemoryController;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
import org.elasticsearch.test.DummyShardLock;
|
||||
|
@ -168,7 +165,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
codecName = "default";
|
||||
}
|
||||
defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
|
||||
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
|
||||
.put(IndexSettings.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
|
||||
.put(EngineConfig.INDEX_CODEC_SETTING, codecName)
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.build()); // TODO randomize more settings
|
||||
|
@ -260,19 +257,19 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
protected InternalEngine createEngine(Store store, Path translogPath) {
|
||||
return createEngine(defaultSettings, store, translogPath, new MergeSchedulerConfig(defaultSettings), newMergePolicy());
|
||||
return createEngine(defaultSettings, store, translogPath, newMergePolicy());
|
||||
}
|
||||
|
||||
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
|
||||
return new InternalEngine(config(indexSettings, store, translogPath, mergeSchedulerConfig, mergePolicy), false);
|
||||
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
|
||||
return new InternalEngine(config(indexSettings, store, translogPath, mergePolicy), false);
|
||||
}
|
||||
|
||||
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
|
||||
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
|
||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
|
||||
|
||||
EngineConfig config = new EngineConfig(shardId, threadPool, indexSettings
|
||||
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
|
||||
, null, store, createSnapshotDeletionPolicy(), mergePolicy,
|
||||
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), new Engine.EventListener() {
|
||||
@Override
|
||||
public void onFailedEngine(String reason, @Nullable Throwable t) {
|
||||
|
@ -293,7 +290,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
public void testSegments() throws Exception {
|
||||
try (Store store = createStore();
|
||||
Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), NoMergePolicy.INSTANCE)) {
|
||||
Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
|
||||
List<Segment> segments = engine.segments(false);
|
||||
assertThat(segments.isEmpty(), equalTo(true));
|
||||
assertThat(engine.segmentsStats().getCount(), equalTo(0l));
|
||||
|
@ -411,7 +408,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
public void testVerboseSegments() throws Exception {
|
||||
try (Store store = createStore();
|
||||
Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), NoMergePolicy.INSTANCE)) {
|
||||
Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
|
||||
List<Segment> segments = engine.segments(true);
|
||||
assertThat(segments.isEmpty(), equalTo(true));
|
||||
|
||||
|
@ -440,7 +437,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
public void testSegmentsWithMergeFlag() throws Exception {
|
||||
try (Store store = createStore();
|
||||
Engine engine = createEngine(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) {
|
||||
Engine engine = createEngine(defaultSettings, store, createTempDir(), new TieredMergePolicy())) {
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
|
||||
Engine.Index index = new Engine.Index(newUid("1"), doc);
|
||||
engine.index(index);
|
||||
|
@ -770,7 +767,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
public void testSyncedFlush() throws IOException {
|
||||
try (Store store = createStore();
|
||||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
|
||||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
|
||||
new LogByteSizeMergePolicy()), false)) {
|
||||
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
|
||||
|
@ -797,7 +794,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
final int iters = randomIntBetween(2, 5); // run this a couple of times to get some coverage
|
||||
for (int i = 0; i < iters; i++) {
|
||||
try (Store store = createStore();
|
||||
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
|
||||
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
|
||||
new LogDocMergePolicy()), false)) {
|
||||
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
|
||||
|
@ -1027,7 +1024,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
public void testForceMerge() throws IOException {
|
||||
try (Store store = createStore();
|
||||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
|
||||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
|
||||
new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP
|
||||
int numDocs = randomIntBetween(10, 100);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
|
@ -1466,7 +1463,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
public void testEnableGcDeletes() throws Exception {
|
||||
try (Store store = createStore();
|
||||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
|
||||
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy()), false)) {
|
||||
engine.config().setEnableGcDeletes(false);
|
||||
|
||||
// Add document
|
||||
|
@ -1605,7 +1602,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(),
|
||||
Settings.builder().put(defaultSettings.getSettings()).put(EngineConfig.INDEX_FORCE_NEW_TRANSLOG, true).build(),
|
||||
Collections.emptyList());
|
||||
engine = createEngine(indexSettings, store, primaryTranslogDir, new MergeSchedulerConfig(indexSettings), newMergePolicy());
|
||||
engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy());
|
||||
}
|
||||
|
||||
public void testTranslogReplayWithFailure() throws IOException {
|
||||
|
@ -1939,7 +1936,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), BigArrays.NON_RECYCLING_INSTANCE);
|
||||
|
||||
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexSettings()
|
||||
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
|
||||
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(),
|
||||
config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener()
|
||||
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5));
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ import org.elasticsearch.index.mapper.ParseContext;
|
|||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardUtils;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
|
@ -117,7 +117,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
codecName = "default";
|
||||
}
|
||||
defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
|
||||
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
|
||||
.put(IndexSettings.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
|
||||
.put(EngineConfig.INDEX_CODEC_SETTING, codecName)
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.build()); // TODO randomize more settings
|
||||
|
@ -209,7 +209,7 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
protected ShadowEngine createShadowEngine(IndexSettings indexSettings, Store store) {
|
||||
return new ShadowEngine(config(indexSettings, store, null, new MergeSchedulerConfig(indexSettings), null));
|
||||
return new ShadowEngine(config(indexSettings, store, null, null));
|
||||
}
|
||||
|
||||
protected InternalEngine createInternalEngine(IndexSettings indexSettings, Store store, Path translogPath) {
|
||||
|
@ -217,14 +217,14 @@ public class ShadowEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
protected InternalEngine createInternalEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
|
||||
return new InternalEngine(config(indexSettings, store, translogPath, new MergeSchedulerConfig(indexSettings), mergePolicy), true);
|
||||
return new InternalEngine(config(indexSettings, store, translogPath, mergePolicy), true);
|
||||
}
|
||||
|
||||
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
|
||||
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
|
||||
IndexWriterConfig iwc = newIndexWriterConfig();
|
||||
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
|
||||
EngineConfig config = new EngineConfig(shardId, threadPool, indexSettings
|
||||
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
|
||||
, null, store, createSnapshotDeletionPolicy(), mergePolicy,
|
||||
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(null, logger), new Engine.EventListener() {
|
||||
@Override
|
||||
public void onFailedEngine(String reason, @Nullable Throwable t) {
|
||||
|
|
|
@ -82,6 +82,7 @@ import org.elasticsearch.index.mapper.ParseContext;
|
|||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
import org.elasticsearch.index.search.stats.SearchSlowLog;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRepository;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
|
@ -129,23 +130,23 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
|
||||
public void testFlushOnDeleteSetting() throws Exception {
|
||||
boolean initValue = randomBoolean();
|
||||
createIndex("test", settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, initValue).build());
|
||||
createIndex("test", settingsBuilder().put(IndexSettings.INDEX_FLUSH_ON_CLOSE, initValue).build());
|
||||
ensureGreen();
|
||||
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
||||
IndexService test = indicesService.indexService("test");
|
||||
IndexShard shard = test.getShardOrNull(0);
|
||||
assertEquals(initValue, shard.isFlushOnClose());
|
||||
assertEquals(initValue, shard.getIndexSettings().isFlushOnClose());
|
||||
final boolean newValue = !initValue;
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, newValue).build()));
|
||||
assertEquals(newValue, shard.isFlushOnClose());
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_FLUSH_ON_CLOSE, newValue).build()));
|
||||
assertEquals(newValue, shard.getIndexSettings().isFlushOnClose());
|
||||
|
||||
try {
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_FLUSH_ON_CLOSE, "FOOBAR").build()));
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_FLUSH_ON_CLOSE, "FOOBAR").build()));
|
||||
fail("exception expected");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
|
||||
}
|
||||
assertEquals(newValue, shard.isFlushOnClose());
|
||||
assertEquals(newValue, shard.getIndexSettings().isFlushOnClose());
|
||||
|
||||
}
|
||||
|
||||
|
@ -719,7 +720,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
IndexService test = indicesService.indexService("test");
|
||||
IndexShard shard = test.getShardOrNull(0);
|
||||
assertFalse(shard.shouldFlush());
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(133 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(133 /* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
|
||||
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
|
||||
assertFalse(shard.shouldFlush());
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null);
|
||||
|
@ -735,7 +736,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
shard.getEngine().getTranslog().sync();
|
||||
long size = shard.getEngine().getTranslog().sizeInBytes();
|
||||
logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(size, ByteSizeUnit.BYTES))
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(size, ByteSizeUnit.BYTES))
|
||||
.build()).get();
|
||||
client().prepareDelete("test", "test", "2").get();
|
||||
logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
|
||||
|
@ -753,7 +754,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
IndexService test = indicesService.indexService("test");
|
||||
final IndexShard shard = test.getShardOrNull(0);
|
||||
assertFalse(shard.shouldFlush());
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(133/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(133/* size of the operation + header&footer*/, ByteSizeUnit.BYTES)).build()).get();
|
||||
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
|
||||
assertFalse(shard.shouldFlush());
|
||||
final AtomicBoolean running = new AtomicBoolean(true);
|
||||
|
@ -1064,7 +1065,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
ShardRouting routing = new ShardRouting(shard.routingEntry());
|
||||
shard.close("simon says", true);
|
||||
NodeServicesProvider indexServices = indexService.getIndexServices();
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, listeners);
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, indexService.getSearchSlowLog(), listeners);
|
||||
ShardRoutingHelper.reinit(routing);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
|
|
|
@ -49,13 +49,13 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.gateway.PrimaryShardAllocator;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.monitor.fs.FsInfo;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
@ -142,7 +142,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
|
||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put("indices.recovery.concurrent_streams", 10)
|
||||
));
|
||||
ensureGreen();
|
||||
|
@ -247,7 +247,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put("indices.recovery.concurrent_streams", 10)
|
||||
));
|
||||
ensureGreen();
|
||||
|
@ -473,7 +473,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
|
||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put("indices.recovery.concurrent_streams", 10)
|
||||
));
|
||||
ensureGreen();
|
||||
|
@ -528,7 +528,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
|
||||
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
|
||||
.put(MockFSIndexStore.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
|
||||
.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)) // no translog based flush - it might change the .liv / segments.N files
|
||||
.put("indices.recovery.concurrent_streams", 10)
|
||||
));
|
||||
ensureGreen();
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.monitor.fs.FsInfo;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
@ -79,7 +79,7 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
|
|||
.put("index.number_of_replicas", 0)
|
||||
.put("index.refresh_interval", "-1")
|
||||
.put(MockEngineSupport.FLUSH_ON_CLOSE_RATIO, 0.0d) // never flush - always recover from translog
|
||||
.put(IndexShard.INDEX_FLUSH_ON_CLOSE, false) // never flush - always recover from translog
|
||||
.put(IndexSettings.INDEX_FLUSH_ON_CLOSE, false) // never flush - always recover from translog
|
||||
));
|
||||
ensureYellow();
|
||||
|
||||
|
@ -170,13 +170,13 @@ public class CorruptedTranslogIT extends ESIntegTestCase {
|
|||
|
||||
/** Disables translog flushing for the specified index */
|
||||
private static void disableTranslogFlush(String index) {
|
||||
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).build();
|
||||
Settings settings = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).build();
|
||||
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
|
||||
}
|
||||
|
||||
/** Enables translog flushing for the specified index */
|
||||
private static void enableTranslogFlush(String index) {
|
||||
Settings settings = Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB)).build();
|
||||
Settings settings = Settings.builder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(512, ByteSizeUnit.MB)).build();
|
||||
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,8 +30,8 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
|
@ -153,7 +153,7 @@ public class FlushIT extends ESIntegTestCase {
|
|||
createIndex("test");
|
||||
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(
|
||||
Settings.builder().put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).put("index.refresh_interval", -1).put("index.number_of_replicas", internalCluster().numDataNodes() - 1))
|
||||
Settings.builder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)).put("index.refresh_interval", -1).put("index.number_of_replicas", internalCluster().numDataNodes() - 1))
|
||||
.get();
|
||||
ensureGreen();
|
||||
final AtomicBoolean stop = new AtomicBoolean(false);
|
||||
|
|
|
@ -31,8 +31,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
|
|
@ -44,8 +44,8 @@ import org.elasticsearch.index.VersionType;
|
|||
import org.elasticsearch.index.cache.query.QueryCacheStats;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.elasticsearch.index.IndexSettings;
|
|||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
|
|||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.CompiledScript;
|
||||
import org.elasticsearch.script.ExecutableScript;
|
||||
|
|
|
@ -101,9 +101,8 @@ import org.elasticsearch.index.mapper.DocumentMapper;
|
|||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType.Loading;
|
||||
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.MergePolicyConfig;
|
||||
import org.elasticsearch.index.shard.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
|
@ -511,10 +510,10 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
|
||||
private static Settings.Builder setRandomIndexTranslogSettings(Random random, Settings.Builder builder) {
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 300), ByteSizeUnit.MB));
|
||||
builder.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 300), ByteSizeUnit.MB));
|
||||
}
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(IndexShard.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
|
||||
builder.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE, new ByteSizeValue(1, ByteSizeUnit.PB)); // just don't flush
|
||||
}
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(IndexSettings.INDEX_TRANSLOG_DURABILITY, RandomPicks.randomFrom(random, Translog.Durability.values()));
|
||||
|
|
Loading…
Reference in New Issue