Cleanup MergeScheduler infrastrucutre

This commit cleans up all the MergeScheduler infrastructure
and simplifies / removes all unneeded abstractions. The MergeScheduler
itself is now private to the Engine and all abstractions like Providers
that had support for multiple merge schedulers etc. are removed.

Closes #11602
This commit is contained in:
Simon Willnauer 2015-06-11 16:12:42 +02:00
parent 483a15a12b
commit 440580dd55
19 changed files with 334 additions and 500 deletions

View File

@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.index;
/**
* Allows pkg private access
*/
public class OneMergeHelper {
private OneMergeHelper() {}
public static String getSegmentName(MergePolicy.OneMerge merge) {
return merge.info != null ? merge.info.info.name : "_na_";
}
}

View File

@ -41,7 +41,6 @@ import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
@ -370,7 +369,6 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
closeInjectorResource(sId, shardInjector,
MergeSchedulerProvider.class,
IndexShardGatewayService.class,
PercolatorQueriesRegistry.class);

View File

@ -17,15 +17,22 @@
* under the License.
*/
package org.apache.lucene.index;
package org.elasticsearch.index.engine;
import org.apache.lucene.index.*;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Collections;
@ -36,9 +43,11 @@ import java.util.Set;
* An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total
* and current merges.
*/
public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
class ElasticsearchConcurrentMergeScheduler extends ConcurrentMergeScheduler {
protected final ESLogger logger;
private final Settings indexSettings;
private final ShardId shardId;
private final MeanMetric totalMerges = new MeanMetric();
private final CounterMetric totalMergesNumDocs = new CounterMetric();
@ -51,46 +60,14 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
private final MergeSchedulerConfig config;
public TrackingConcurrentMergeScheduler(ESLogger logger) {
super();
this.logger = logger;
}
public long totalMerges() {
return totalMerges.count();
}
public long totalMergeTime() {
return totalMerges.sum();
}
public long totalMergeNumDocs() {
return totalMergesNumDocs.count();
}
public long totalMergeSizeInBytes() {
return totalMergesSizeInBytes.count();
}
public long currentMerges() {
return currentMerges.count();
}
public long currentMergesNumDocs() {
return currentMergesNumDocs.count();
}
public long currentMergesSizeInBytes() {
return currentMergesSizeInBytes.count();
}
public long totalMergeStoppedTimeMillis() {
return totalMergeStoppedTime.count();
}
public long totalMergeThrottledTimeMillis() {
return totalMergeThrottledTime.count();
public ElasticsearchConcurrentMergeScheduler(ShardId shardId, Settings indexSettings, MergeSchedulerConfig config) {
this.config = config;
this.shardId = shardId;
this.indexSettings = indexSettings;
this.logger = Loggers.getLogger(getClass(), indexSettings, shardId);
refreshConfig();
}
public Set<OnGoingMerge> onGoingMerges() {
@ -110,7 +87,7 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
onGoingMerges.add(onGoingMerge);
if (logger.isTraceEnabled()) {
logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size", merge.info == null ? "_na_" : merge.info.info.name, merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes), new ByteSizeValue(merge.estimatedMergeBytes));
logger.trace("merge [{}] starting..., merging [{}] segments, [{}] docs, [{}] size, into [{}] estimated_size", OneMergeHelper.getSegmentName(merge), merge.segments.size(), totalNumDocs, new ByteSizeValue(totalSizeInBytes), new ByteSizeValue(merge.estimatedMergeBytes));
}
try {
beforeMerge(onGoingMerge);
@ -137,7 +114,7 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
String message = String.format(Locale.ROOT,
"merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs], [%s stopped], [%s throttled], [%,.1f MB written], [%,.1f MB/sec throttle]",
merge.info == null ? "_na_" : merge.info.info.name,
OneMergeHelper.getSegmentName(merge),
TimeValue.timeValueMillis(tookMS),
totalSizeInBytes/1024f/1024f,
totalNumDocs,
@ -157,16 +134,12 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void beforeMerge(OnGoingMerge merge) {
}
protected void beforeMerge(OnGoingMerge merge) {}
/**
* A callback allowing for custom logic before an actual merge starts.
*/
protected void afterMerge(OnGoingMerge merge) {
}
protected void afterMerge(OnGoingMerge merge) {}
@Override
public MergeScheduler clone() {
@ -174,4 +147,40 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
// the clone will just be the identity.
return this;
}
@Override
protected boolean maybeStall(IndexWriter writer) {
// Don't stall here, because we do our own index throttling (in InternalEngine.IndexThrottle) when merges can't keep up
return true;
}
@Override
protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = super.getMergeThread(writer, merge);
thread.setName(EsExecutors.threadName(indexSettings, "[" + shardId.index().name() + "][" + shardId.id() + "]: " + thread.getName()));
return thread;
}
MergeStats stats() {
final MergeStats mergeStats = new MergeStats();
mergeStats.add(totalMerges.count(), totalMerges.sum(), totalMergesNumDocs.count(), totalMergesSizeInBytes.count(),
currentMerges.count(), currentMergesNumDocs.count(), currentMergesSizeInBytes.count(),
totalMergeStoppedTime.count(),
totalMergeThrottledTime.count(),
config.isAutoThrottle() ? getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY);
return mergeStats;
}
void refreshConfig() {
if (this.getMaxMergeCount() != config.getMaxMergeCount() || this.getMaxThreadCount() != config.getMaxThreadCount()) {
this.setMaxMergesAndThreads(config.getMaxMergeCount(), config.getMaxThreadCount());
}
boolean isEnabled = getIORateLimitMBPerSec() != Double.POSITIVE_INFINITY;
if (config.isAutoThrottle() && isEnabled == false) {
enableAutoIOThrottle();
} else if (config.isAutoThrottle() == false && isEnabled){
disableAutoIOThrottle();
}
}
}

View File

@ -45,10 +45,12 @@ import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@ -147,6 +149,10 @@ public abstract class Engine implements Closeable {
protected abstract SegmentInfos getLastCommittedSegmentInfos();
public MergeStats getMergeStats() {
return new MergeStats();
}
/** A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
* is enabled
@ -1186,4 +1192,6 @@ public abstract class Engine implements Closeable {
return Arrays.hashCode(id);
}
}
public void onSettingsChanged() {}
}

View File

@ -22,6 +22,7 @@ import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.similarities.Similarity;
@ -34,7 +35,6 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
@ -70,7 +70,7 @@ public final class EngineConfig {
private final Store store;
private final SnapshotDeletionPolicy deletionPolicy;
private final MergePolicy mergePolicy;
private final MergeSchedulerProvider mergeScheduler;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final Analyzer analyzer;
private final Similarity similarity;
private final CodecService codecService;
@ -142,7 +142,7 @@ public final class EngineConfig {
*/
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService,
IndexSettingsService indexSettingsService, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy, MergeSchedulerProvider mergeScheduler, Analyzer analyzer,
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache filterCache, QueryCachingPolicy filterCachingPolicy, TranslogConfig translogConfig) {
this.shardId = shardId;
@ -153,7 +153,7 @@ public final class EngineConfig {
this.store = store;
this.deletionPolicy = deletionPolicy;
this.mergePolicy = mergePolicy;
this.mergeScheduler = mergeScheduler;
this.mergeSchedulerConfig = mergeSchedulerConfig;
this.analyzer = analyzer;
this.similarity = similarity;
this.codecService = codecService;
@ -347,11 +347,10 @@ public final class EngineConfig {
}
/**
* Returns the {@link org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider} used to obtain
* a {@link org.apache.lucene.index.MergeScheduler} for the engines {@link org.apache.lucene.index.IndexWriter}
* Returns the {@link MergeSchedulerConfig}
*/
public MergeSchedulerProvider getMergeScheduler() {
return mergeScheduler;
public MergeSchedulerConfig getMergeSchedulerConfig() {
return mergeSchedulerConfig;
}
/**

View File

@ -25,6 +25,7 @@ import org.apache.lucene.index.IndexWriter.IndexReaderWarmer;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.lucene.search.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
@ -39,17 +40,17 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
@ -68,9 +69,6 @@ import java.util.concurrent.locks.ReentrantLock;
*
*/
public class InternalEngine extends Engine {
private final FailEngineOnMergeFailure mergeSchedulerFailureListener;
private final MergeSchedulerListener mergeSchedulerListener;
/**
* When we last pruned expired tombstones from versionMap.deletes:
*/
@ -80,7 +78,7 @@ public class InternalEngine extends Engine {
@Nullable
private final IndicesWarmer warmer;
private final Translog translog;
private final MergeSchedulerProvider mergeScheduler;
private final ElasticsearchConcurrentMergeScheduler mergeScheduler;
private final IndexWriter indexWriter;
@ -109,12 +107,13 @@ public class InternalEngine extends Engine {
IndexWriter writer = null;
Translog translog = null;
SearcherManager manager = null;
EngineMergeScheduler scheduler = null;
boolean success = false;
try {
this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().estimatedTimeInMillis();
this.indexingService = engineConfig.getIndexingService();
this.warmer = engineConfig.getWarmer();
this.mergeScheduler = engineConfig.getMergeScheduler();
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings(), engineConfig.getMergeSchedulerConfig());
this.dirtyLocks = new Object[engineConfig.getIndexConcurrency() * 50]; // we multiply it to have enough...
for (int i = 0; i < dirtyLocks.length; i++) {
dirtyLocks[i] = new Object();
@ -139,10 +138,6 @@ public class InternalEngine extends Engine {
manager = createSearcherManager();
this.searcherManager = manager;
this.versionMap.setManager(searcherManager);
this.mergeSchedulerFailureListener = new FailEngineOnMergeFailure();
this.mergeSchedulerListener = new MergeSchedulerListener();
this.mergeScheduler.addListener(mergeSchedulerListener);
this.mergeScheduler.addFailureListener(mergeSchedulerFailureListener);
try {
if (skipInitialTranslogRecovery) {
// make sure we point at the latest translog from now on..
@ -156,7 +151,7 @@ public class InternalEngine extends Engine {
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(writer, translog, manager);
IOUtils.closeWhileHandlingException(writer, translog, manager, scheduler);
versionMap.clear();
if (isClosed.get() == false) {
// failure we need to dec the store reference
@ -667,6 +662,7 @@ public class InternalEngine extends Engine {
// for a long time:
maybePruneDeletedTombstones();
versionMapRefreshPending.set(false);
mergeScheduler.refreshConfig();
}
@Override
@ -971,8 +967,6 @@ public class InternalEngine extends Engine {
logger.warn("failed to rollback writer on close", e);
} finally {
store.decRef();
this.mergeScheduler.removeListener(mergeSchedulerListener);
this.mergeScheduler.removeFailureListener(mergeSchedulerFailureListener);
logger.debug("engine closed [{}]", reason);
}
}
@ -1016,7 +1010,7 @@ public class InternalEngine extends Engine {
} catch (Throwable ignore) {
}
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger));
iwc.setMergeScheduler(mergeScheduler.newMergeScheduler());
iwc.setMergeScheduler(mergeScheduler);
MergePolicy mergePolicy = config().getMergePolicy();
// Give us the opportunity to upgrade old segments while performing
// background merges
@ -1155,24 +1149,18 @@ public class InternalEngine extends Engine {
}
class FailEngineOnMergeFailure implements MergeSchedulerProvider.FailureListener {
@Override
public void onFailedMerge(MergePolicy.MergeException e) {
if (Lucene.isCorruptionException(e)) {
failEngine("corrupt file detected source: [merge]", e);
} else {
failEngine("merge exception", e);
}
}
}
class MergeSchedulerListener implements MergeSchedulerProvider.Listener {
private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeScheduler {
private final AtomicInteger numMergesInFlight = new AtomicInteger(0);
private final AtomicBoolean isThrottling = new AtomicBoolean();
EngineMergeScheduler(ShardId shardId, Settings indexSettings, MergeSchedulerConfig config) {
super(shardId, indexSettings, config);
}
@Override
public synchronized void beforeMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMerges();
int maxNumMerges = mergeScheduler.getMaxMergeCount();
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
if (isThrottling.getAndSet(true) == false) {
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
@ -1184,7 +1172,7 @@ public class InternalEngine extends Engine {
@Override
public synchronized void afterMerge(OnGoingMerge merge) {
int maxNumMerges = mergeScheduler.getMaxMerges();
int maxNumMerges = mergeScheduler.getMaxMergeCount();
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
@ -1193,6 +1181,24 @@ public class InternalEngine extends Engine {
}
}
}
@Override
protected void handleMergeException(final Directory dir, final Throwable exc) {
logger.error("failed to merge", exc);
if (config().getMergeSchedulerConfig().isNotifyOnMergeFailure()) {
engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Throwable t) {
logger.debug("merge failure action rejected", t);
}
@Override
protected void doRun() throws Exception {
MergePolicy.MergeException e = new MergePolicy.MergeException(exc, dir);
failEngine("merge failed", e);
}
});
}
}
}
private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
@ -1217,4 +1223,13 @@ public class InternalEngine extends Engine {
commitIndexWriter(writer, translog, null);
}
public void onSettingsChanged() {
mergeScheduler.refreshConfig();
}
public MergeStats getMergeStats() {
return mergeScheduler.stats();
}
}

View File

@ -242,4 +242,5 @@ public class ShadowEngine extends Engine {
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
}
}

View File

@ -1,209 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge.scheduler;
import com.google.common.collect.ImmutableSet;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.TrackingConcurrentMergeScheduler;
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
*
*/
public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
private final IndexSettingsService indexSettingsService;
private final ApplySettings applySettings = new ApplySettings();
public static final String MAX_THREAD_COUNT = "index.merge.scheduler.max_thread_count";
public static final String MAX_MERGE_COUNT = "index.merge.scheduler.max_merge_count";
public static final String AUTO_THROTTLE = "index.merge.scheduler.auto_throttle";
private volatile int maxThreadCount;
private volatile int maxMergeCount;
private volatile boolean autoThrottle;
private Set<CustomConcurrentMergeScheduler> schedulers = new CopyOnWriteArraySet<>();
@Inject
public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexSettingsService indexSettingsService) {
super(shardId, indexSettings, threadPool);
this.indexSettingsService = indexSettingsService;
this.maxThreadCount = indexSettings.getAsInt(MAX_THREAD_COUNT, Math.max(1, Math.min(4, EsExecutors.boundedNumberOfProcessors(indexSettings) / 2)));
this.maxMergeCount = indexSettings.getAsInt(MAX_MERGE_COUNT, maxThreadCount + 5);
this.autoThrottle = indexSettings.getAsBoolean(AUTO_THROTTLE, true);
logger.debug("using [concurrent] merge scheduler with max_thread_count[{}], max_merge_count[{}], auto_throttle[{}]", maxThreadCount, maxMergeCount, autoThrottle);
indexSettingsService.addListener(applySettings);
}
@Override
public MergeScheduler newMergeScheduler() {
CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this);
// NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows
// InternalEngine.IndexThrottle to detect too-many-merges and throttle:
concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount+1, maxThreadCount);
if (autoThrottle) {
concurrentMergeScheduler.enableAutoIOThrottle();
} else {
concurrentMergeScheduler.disableAutoIOThrottle();
}
schedulers.add(concurrentMergeScheduler);
return concurrentMergeScheduler;
}
@Override
public MergeStats stats() {
MergeStats mergeStats = new MergeStats();
// TODO: why would there be more than one CMS for a single shard...?
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
mergeStats.add(scheduler.totalMerges(), scheduler.totalMergeTime(), scheduler.totalMergeNumDocs(), scheduler.totalMergeSizeInBytes(),
scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes(),
scheduler.totalMergeStoppedTimeMillis(),
scheduler.totalMergeThrottledTimeMillis(),
autoThrottle ? scheduler.getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY);
}
return mergeStats;
}
@Override
public Set<OnGoingMerge> onGoingMerges() {
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
return scheduler.onGoingMerges();
}
return ImmutableSet.of();
}
@Override
public void close() {
indexSettingsService.removeListener(applySettings);
}
@Override
public int getMaxMerges() {
return this.maxMergeCount;
}
public static class CustomConcurrentMergeScheduler extends TrackingConcurrentMergeScheduler {
private final ShardId shardId;
private final ConcurrentMergeSchedulerProvider provider;
private CustomConcurrentMergeScheduler(ESLogger logger, ShardId shardId, ConcurrentMergeSchedulerProvider provider) {
super(logger);
this.shardId = shardId;
this.provider = provider;
}
@Override
protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = super.getMergeThread(writer, merge);
thread.setName(EsExecutors.threadName(provider.indexSettings(), "[" + shardId.index().name() + "][" + shardId.id() + "]: " + thread.getName()));
return thread;
}
@Override
protected void handleMergeException(Directory dir, Throwable exc) {
logger.error("failed to merge", exc);
provider.failedMerge(new MergePolicy.MergeException(exc, dir));
// NOTE: do not call super.handleMergeException here, which would just re-throw the exception
// and let Java's thread exc handler see it / log it to stderr, but we already 1) logged it
// and 2) handled the exception by failing the engine
}
@Override
public void close() {
super.close();
provider.schedulers.remove(this);
}
@Override
protected void beforeMerge(OnGoingMerge merge) {
super.beforeMerge(merge);
provider.beforeMerge(merge);
}
@Override
protected void afterMerge(OnGoingMerge merge) {
super.afterMerge(merge);
provider.afterMerge(merge);
}
@Override
protected boolean maybeStall(IndexWriter writer) {
// Don't stall here, because we do our own index throttling (in InternalEngine.IndexThrottle) when merges can't keep up
return true;
}
}
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
int maxThreadCount = settings.getAsInt(MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) {
logger.info("updating [{}] from [{}] to [{}]", MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount);
ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount;
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount);
}
}
int maxMergeCount = settings.getAsInt(MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount);
if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) {
logger.info("updating [{}] from [{}] to [{}]", MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount);
ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount;
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
}
}
boolean autoThrottle = settings.getAsBoolean(AUTO_THROTTLE, ConcurrentMergeSchedulerProvider.this.autoThrottle);
if (autoThrottle != ConcurrentMergeSchedulerProvider.this.autoThrottle) {
logger.info("updating [{}] from [{}] to [{}]", AUTO_THROTTLE, ConcurrentMergeSchedulerProvider.this.autoThrottle, autoThrottle);
ConcurrentMergeSchedulerProvider.this.autoThrottle = autoThrottle;
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
if (autoThrottle) {
scheduler.enableAutoIOThrottle();
} else {
scheduler.disableAutoIOThrottle();
}
}
}
}
}
}

View File

@ -1,128 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge.scheduler;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*
*/
public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements Closeable {
public static interface FailureListener {
void onFailedMerge(MergePolicy.MergeException e);
}
/**
* Listener for events before/after single merges. Called on the merge thread.
*/
public static interface Listener {
/**
* A callback before a merge is going to execute. Note, any logic here will block the merge
* till its done.
*/
void beforeMerge(OnGoingMerge merge);
/**
* A callback after a merge is going to execute. Note, any logic here will block the merge
* thread.
*/
void afterMerge(OnGoingMerge merge);
}
private final ThreadPool threadPool;
private final CopyOnWriteArrayList<FailureListener> failureListeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
private final boolean notifyOnMergeFailure;
protected MergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
super(shardId, indexSettings);
this.threadPool = threadPool;
this.notifyOnMergeFailure = indexSettings.getAsBoolean("index.merge.scheduler.notify_on_failure", true);
}
public void addFailureListener(FailureListener listener) {
failureListeners.add(listener);
}
public void removeFailureListener(FailureListener listener) {
failureListeners.remove(listener);
}
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
}
protected void failedMerge(final MergePolicy.MergeException e) {
if (!notifyOnMergeFailure) {
return;
}
for (final FailureListener failureListener : failureListeners) {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
failureListener.onFailedMerge(e);
}
});
}
}
protected void beforeMerge(OnGoingMerge merge) {
for (Listener listener : listeners) {
listener.beforeMerge(merge);
}
}
protected void afterMerge(OnGoingMerge merge) {
for (Listener listener : listeners) {
listener.afterMerge(merge);
}
}
/** Maximum number of allowed running merges before index throttling kicks in. */
public abstract int getMaxMerges();
public abstract MergeScheduler newMergeScheduler();
public abstract MergeStats stats();
public abstract Set<OnGoingMerge> onGoingMerges();
@Override
public abstract void close();
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.settings;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@ -30,7 +31,6 @@ import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.MergePolicyConfig;
@ -51,9 +51,9 @@ public class IndexDynamicSettingsModule extends AbstractModule {
indexDynamicSettings = new DynamicSettings();
indexDynamicSettings.addDynamicSetting(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(IndexStore.INDEX_STORE_THROTTLE_TYPE);
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT);
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT);
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE);
indexDynamicSettings.addDynamicSetting(MergeSchedulerConfig.MAX_THREAD_COUNT);
indexDynamicSettings.addDynamicSetting(MergeSchedulerConfig.MAX_MERGE_COUNT);
indexDynamicSettings.addDynamicSetting(MergeSchedulerConfig.AUTO_THROTTLE);
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_REQUIRE_GROUP + "*");
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*");
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*");

View File

@ -22,9 +22,7 @@ package org.elasticsearch.index.shard;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import org.apache.lucene.codecs.PostingsFormat;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.index.*;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
@ -77,7 +75,6 @@ import org.elasticsearch.index.indexing.IndexingStats;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
@ -131,7 +128,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexCache indexCache;
private final InternalIndicesLifecycle indicesLifecycle;
private final Store store;
private final MergeSchedulerProvider mergeScheduler;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final IndexAliasesService indexAliasesService;
private final ShardIndexingService indexingService;
private final ShardSearchService searchService;
@ -194,7 +191,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexShardOperationCounter indexShardOperationCounter;
@Inject
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler,
public IndexShard(ShardId shardId, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService,
@ -212,7 +209,7 @@ public class IndexShard extends AbstractIndexShardComponent {
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
this.store = store;
this.mergeScheduler = mergeScheduler;
this.mergeSchedulerConfig = new MergeSchedulerConfig(indexSettings);
this.threadPool = threadPool;
this.mapperService = mapperService;
this.queryParserService = queryParserService;
@ -611,7 +608,11 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public MergeStats mergeStats() {
return mergeScheduler.stats();
final Engine engine = engineUnsafe();
if (engine == null) {
return new MergeStats();
}
return engine.getMergeStats();
}
public SegmentsStats segmentStats() {
@ -1097,6 +1098,27 @@ public class IndexShard extends AbstractIndexShardComponent {
if (config.getVersionMapSizeSetting().equals(versionMapSize) == false) {
config.setVersionMapSizeSetting(versionMapSize);
}
final int maxThreadCount = settings.getAsInt(MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxThreadCount());
if (maxThreadCount != mergeSchedulerConfig.getMaxThreadCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_THREAD_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxThreadCount);
mergeSchedulerConfig.setMaxThreadCount(maxThreadCount);
change = true;
}
final int maxMergeCount = settings.getAsInt(MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount());
if (maxMergeCount != mergeSchedulerConfig.getMaxMergeCount()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.MAX_MERGE_COUNT, mergeSchedulerConfig.getMaxMergeCount(), maxMergeCount);
mergeSchedulerConfig.setMaxMergeCount(maxMergeCount);
change = true;
}
final boolean autoThrottle = settings.getAsBoolean(MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle());
if (autoThrottle != mergeSchedulerConfig.isAutoThrottle()) {
logger.info("updating [{}] from [{}] to [{}]", MergeSchedulerConfig.AUTO_THROTTLE, mergeSchedulerConfig.isAutoThrottle(), autoThrottle);
mergeSchedulerConfig.setAutoThrottle(autoThrottle);
change = true;
}
}
mergePolicyConfig.onRefreshSettings(settings);
if (change) {
@ -1338,7 +1360,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeScheduler,
threadPool, indexingService, indexSettingsService, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperAnalyzer, similarityService.similarity(), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.filter(), indexCache.filterPolicy(), translogConfig);
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.shard;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.query.ShardQueryCache;
@ -34,15 +33,11 @@ import org.elasticsearch.index.gateway.IndexShardGatewayService;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.search.slowlog.ShardSlowLogSearchService;
import org.elasticsearch.index.search.stats.ShardSearchService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotAndRestoreService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.suggest.stats.ShardSuggestService;
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
import org.elasticsearch.index.translog.TranslogService;
@ -92,7 +87,6 @@ public class IndexShardModule extends AbstractModule {
}
bind(EngineFactory.class).to(settings.getAsClass(ENGINE_FACTORY, DEFAULT_ENGINE_FACTORY_CLASS, ENGINE_PREFIX, ENGINE_SUFFIX));
bind(MergeSchedulerProvider.class).to(ConcurrentMergeSchedulerProvider.class).asEagerSingleton();
bind(ShardIndexWarmerService.class).asEagerSingleton();
bind(ShardIndexingService.class).asEagerSingleton();
bind(ShardSlowLogIndexingService.class).asEagerSingleton();

View File

@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
/**
*
*/
public final class MergeSchedulerConfig {
public static final String MAX_THREAD_COUNT = "index.merge.scheduler.max_thread_count";
public static final String MAX_MERGE_COUNT = "index.merge.scheduler.max_merge_count";
public static final String AUTO_THROTTLE = "index.merge.scheduler.auto_throttle";
public static final String NOTIFY_ON_MERGE_FAILURE = "index.merge.scheduler.notify_on_failure"; // why would we not wanna do this?
private volatile boolean autoThrottle;
private volatile int maxThreadCount;
private volatile int maxMergeCount;
private final boolean notifyOnMergeFailure;
public MergeSchedulerConfig(Settings indexSettings) {
maxThreadCount = indexSettings.getAsInt(MAX_THREAD_COUNT, Math.max(1, Math.min(4, EsExecutors.boundedNumberOfProcessors(indexSettings) / 2)));
maxMergeCount = indexSettings.getAsInt(MAX_MERGE_COUNT, maxThreadCount + 5);
this.autoThrottle = indexSettings.getAsBoolean(AUTO_THROTTLE, true);
notifyOnMergeFailure = indexSettings.getAsBoolean(NOTIFY_ON_MERGE_FAILURE, true);
}
/**
* Returns <code>true</code> iff auto throttle is enabled.
* @see ConcurrentMergeScheduler#enableAutoIOThrottle()
*/
public boolean isAutoThrottle() {
return autoThrottle;
}
/**
* Enables / disables auto throttling on the {@link ConcurrentMergeScheduler}
*/
public void setAutoThrottle(boolean autoThrottle) {
this.autoThrottle = autoThrottle;
}
/**
* Returns {@code maxThreadCount}.
*/
public int getMaxThreadCount() {
return maxThreadCount;
}
/**
* Expert: directly set the maximum number of merge threads and
* simultaneous merges allowed.
*/
public void setMaxThreadCount(int maxThreadCount) {
this.maxThreadCount = maxThreadCount;
}
/**
* Returns {@code maxMergeCount}.
*/
public int getMaxMergeCount() {
return maxMergeCount;
}
/**
*
* Expert: set the maximum number of simultaneous merges allowed.
*/
public void setMaxMergeCount(int maxMergeCount) {
this.maxMergeCount = maxMergeCount;
}
/**
* Returns <code>true</code> iff we fail the engine on a merge failure. Default is <code>true</code>
*/
public boolean isNotifyOnMergeFailure() {
return notifyOnMergeFailure;
}
}

View File

@ -40,7 +40,7 @@ import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
@ -67,7 +67,7 @@ public final class ShadowIndexShard extends IndexShard {
@Inject
public ShadowIndexShard(ShardId shardId, IndexSettingsService indexSettingsService,
IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler,
IndicesLifecycle indicesLifecycle, Store store,
ThreadPool threadPool, MapperService mapperService,
IndexQueryParserService queryParserService, IndexCache indexCache,
IndexAliasesService indexAliasesService, ShardIndexingService indexingService,
@ -81,7 +81,7 @@ public final class ShadowIndexShard extends IndexShard {
SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
EngineFactory factory, ClusterService clusterService,
NodeEnvironment nodeEnv, ShardPath path, BigArrays bigArrays) throws IOException {
super(shardId, indexSettingsService, indicesLifecycle, store, mergeScheduler,
super(shardId, indexSettingsService, indicesLifecycle, store,
threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indexingService, getService, searchService, shardWarmerService, shardFilterCache,
shardFieldData, percolatorQueriesRegistry, shardPercolateService, codecService,
@ -104,6 +104,11 @@ public final class ShadowIndexShard extends IndexShard {
super.updateRoutingEntry(newRouting, persistState);
}
@Override
public MergeStats mergeStats() {
return new MergeStats();
}
@Override
public boolean canIndex() {
return false;

View File

@ -64,10 +64,9 @@ import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.object.RootObjectMapper;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexDynamicSettingsModule;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
@ -231,25 +230,22 @@ public class InternalEngineTests extends ElasticsearchTestCase {
return new SnapshotDeletionPolicy(createIndexDeletionPolicy());
}
protected MergeSchedulerProvider createMergeScheduler(IndexSettingsService indexSettingsService) {
return new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService);
}
protected InternalEngine createEngine(Store store, Path translogPath) {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
return createEngine(indexSettingsService, store, translogPath, createMergeScheduler(indexSettingsService), newMergePolicy());
return createEngine(indexSettingsService, store, translogPath, new MergeSchedulerConfig(indexSettingsService.indexSettings()), newMergePolicy());
}
protected InternalEngine createEngine(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerProvider mergeSchedulerProvider, MergePolicy mergePolicy) {
return new InternalEngine(config(indexSettingsService, store, translogPath, mergeSchedulerProvider, mergePolicy), false);
protected InternalEngine createEngine(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
return new InternalEngine(config(indexSettingsService, store, translogPath, mergeSchedulerConfig, mergePolicy), false);
}
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerProvider mergeSchedulerProvider, MergePolicy mergePolicy) {
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig, MergePolicy mergePolicy) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettingsService.getSettings(), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerProvider,
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.FailedEngineListener() {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
@ -413,10 +409,9 @@ public class InternalEngineTests extends ElasticsearchTestCase {
@Test
public void testSegmentsWithMergeFlag() throws Exception {
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
try (Store store = createStore();
Engine engine = createEngine(indexSettingsService, store, createTempDir(), mergeSchedulerProvider, new TieredMergePolicy())) {
Engine engine = createEngine(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), new TieredMergePolicy())) {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
Engine.Index index = new Engine.Index(null, newUid("1"), doc);
engine.index(index);
@ -685,7 +680,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
public void testSyncedFlush() throws IOException {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
try (Store store = createStore();
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), createMergeScheduler(indexSettingsService),
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
new LogByteSizeMergePolicy()), false)) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
@ -903,7 +898,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
public void testForceMerge() throws IOException {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
try (Store store = createStore();
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), createMergeScheduler(indexSettingsService),
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings),
new LogByteSizeMergePolicy()), false)) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
@ -1354,7 +1349,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
public void testEnableGcDeletes() throws Exception {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
try (Store store = createStore();
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), createMergeScheduler(indexSettingsService), newMergePolicy()), false)) {
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()), false)) {
engine.config().setEnableGcDeletes(false);
// Add document
@ -1576,7 +1571,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
.put(EngineConfig.INDEX_BUFFER_SIZE_SETTING, "1kb").build();
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), indexSettings);
try (Store store = createStore();
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), createMergeScheduler(indexSettingsService), newMergePolicy()),
Engine engine = new InternalEngine(config(indexSettingsService, store, createTempDir(), new MergeSchedulerConfig(defaultSettings), newMergePolicy()),
false)) {
for (int i = 0; i < 100; i++) {
String id = Integer.toString(i);
@ -1627,7 +1622,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
// now it should be OK.
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings)
.put(EngineConfig.INDEX_FORCE_NEW_TRANSLOG, true).build());
engine = createEngine(indexSettingsService, store, primaryTranslogDir, createMergeScheduler(indexSettingsService), newMergePolicy());
engine = createEngine(indexSettingsService, store, primaryTranslogDir, new MergeSchedulerConfig(defaultSettings), newMergePolicy());
}
public void testTranslogReplayWithFailure() throws IOException {
@ -1880,7 +1875,7 @@ public class InternalEngineTests extends ElasticsearchTestCase {
TranslogConfig translogConfig = new TranslogConfig(shardId, translog.location(), config.getIndexSettings(), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettingsService()
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeScheduler(),
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener()
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);

View File

@ -48,9 +48,8 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.store.DirectoryService;
@ -197,33 +196,30 @@ public class ShadowEngineTests extends ElasticsearchTestCase {
return new SnapshotDeletionPolicy(createIndexDeletionPolicy());
}
protected MergeSchedulerProvider createMergeScheduler(IndexSettingsService indexSettingsService) {
return new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService);
}
protected ShadowEngine createShadowEngine(Store store) {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
return createShadowEngine(indexSettingsService, store, createMergeScheduler(indexSettingsService));
return createShadowEngine(indexSettingsService, store);
}
protected InternalEngine createInternalEngine(Store store, Path translogPath) {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), Settings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
return createInternalEngine(indexSettingsService, store, translogPath, createMergeScheduler(indexSettingsService));
return createInternalEngine(indexSettingsService, store, translogPath);
}
protected ShadowEngine createShadowEngine(IndexSettingsService indexSettingsService, Store store, MergeSchedulerProvider mergeSchedulerProvider) {
return new ShadowEngine(config(indexSettingsService, store, null, mergeSchedulerProvider));
protected ShadowEngine createShadowEngine(IndexSettingsService indexSettingsService, Store store) {
return new ShadowEngine(config(indexSettingsService, store, null, new MergeSchedulerConfig(indexSettingsService.indexSettings())));
}
protected InternalEngine createInternalEngine(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerProvider mergeSchedulerProvider) {
return new InternalEngine(config(indexSettingsService, store, translogPath, mergeSchedulerProvider), true);
protected InternalEngine createInternalEngine(IndexSettingsService indexSettingsService, Store store, Path translogPath) {
return new InternalEngine(config(indexSettingsService, store, translogPath, new MergeSchedulerConfig(indexSettingsService.indexSettings())), true);
}
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerProvider mergeSchedulerProvider) {
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Path translogPath, MergeSchedulerConfig mergeSchedulerConfig) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettingsService.getSettings(), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
, null, store, createSnapshotDeletionPolicy(),newMergePolicy(), mergeSchedulerProvider,
, null, store, createSnapshotDeletionPolicy(),newMergePolicy(), mergeSchedulerConfig,
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {

View File

@ -23,6 +23,7 @@ import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
@ -31,7 +32,6 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
@ -149,8 +149,8 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "2")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT, "1")
.put(MergeSchedulerConfig.MAX_MERGE_COUNT, "2")
.put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL, 0) // get stats all the time - no caching
));
ensureGreen();
@ -321,9 +321,9 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "2")
.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, "true")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT, "1")
.put(MergeSchedulerConfig.MAX_MERGE_COUNT, "2")
.put(MergeSchedulerConfig.AUTO_THROTTLE, "true")
));
// Disable auto throttle:
@ -332,7 +332,7 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder()
.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, "no"))
.put(MergeSchedulerConfig.AUTO_THROTTLE, "no"))
.get();
// Make sure we log the change:
@ -340,7 +340,7 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
// Make sure setting says it is in fact changed:
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
assertThat(getSettingsResponse.getSetting("test", ConcurrentMergeSchedulerProvider.AUTO_THROTTLE), equalTo("no"));
assertThat(getSettingsResponse.getSetting("test", MergeSchedulerConfig.AUTO_THROTTLE), equalTo("no"));
} finally {
rootLogger.removeAppender(mockAppender);
rootLogger.setLevel(savedLevel);
@ -365,8 +365,8 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "10000")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT, "10000")
.put(MergeSchedulerConfig.MAX_MERGE_COUNT, "10000")
));
assertFalse(mockAppender.sawUpdateMaxThreadCount);
@ -377,7 +377,7 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder()
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT, "1")
)
.get();
@ -386,7 +386,7 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
// Make sure setting says it is in fact changed:
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
assertThat(getSettingsResponse.getSetting("test", ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT), equalTo("1"));
assertThat(getSettingsResponse.getSetting("test", MergeSchedulerConfig.MAX_THREAD_COUNT), equalTo("1"));
} finally {
rootLogger.removeAppender(mockAppender);

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.stats;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.apache.lucene.util.LuceneTestCase.SuppressCodecs;
import org.apache.lucene.util.Version;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
@ -42,7 +43,6 @@ import org.elasticsearch.index.cache.filter.FilterCacheModule;
import org.elasticsearch.index.cache.filter.FilterCacheModule.FilterCacheSettings;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.filter.index.IndexFilterCache;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.store.IndexStore;
@ -287,8 +287,8 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT, "1")
.put(MergeSchedulerConfig.MAX_MERGE_COUNT, "10000")
));
ensureGreen();
long termUpto = 0;
@ -320,8 +320,8 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "1")
.put(MergeSchedulerConfig.MAX_THREAD_COUNT, "1")
.put(MergeSchedulerConfig.MAX_MERGE_COUNT, "1")
.put("index.merge.policy.type", "tiered")
));

View File

@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.impl.client.HttpClients;
import org.elasticsearch.index.shard.MergeSchedulerConfig;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
@ -101,7 +102,6 @@ import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappedFieldType.Loading;
import org.elasticsearch.index.mapper.internal.SizeFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.shard.MergePolicyConfig;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
@ -475,7 +475,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
if (random.nextBoolean()) {
builder.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, false);
builder.put(MergeSchedulerConfig.AUTO_THROTTLE, false);
}
if (random.nextBoolean()) {
@ -531,8 +531,8 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
case 3:
final int maxThreadCount = RandomInts.randomIntBetween(random, 1, 4);
final int maxMergeCount = RandomInts.randomIntBetween(random, maxThreadCount, maxThreadCount + 4);
builder.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, maxMergeCount);
builder.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, maxThreadCount);
builder.put(MergeSchedulerConfig.MAX_MERGE_COUNT, maxMergeCount);
builder.put(MergeSchedulerConfig.MAX_THREAD_COUNT, maxThreadCount);
break;
}