Core: let Lucene kick off merges

Today, Elasticsearch has a separate merge thread pool checking once
per second (by default) if any merges are necessary, but this is no
longer necessary since we can and do now tell Lucene's
ConcurrentMergeScheduler never to "hard pause" threads when merges
fall behind, since we do our own index throttling.

This change goes back to letting Lucene launch merges as needed, and
removes these two expert settings:

  index.merge.force_async_merge
  index.merge.async_interval

Now merges kick off immediately instead of waiting up to 1 second
before running.

Closes #8643
This commit is contained in:
Michael McCandless 2014-11-25 04:13:57 -05:00 committed by mikemccand
parent c4e2f63b17
commit 856b294441
10 changed files with 9 additions and 164 deletions

View File

@ -57,7 +57,6 @@ Currently available <<modules-threadpool,thread pools>>:
|`get` |`g` |Thread pool used for <<docs-get,get>> operations |`get` |`g` |Thread pool used for <<docs-get,get>> operations
|`index` |`i` |Thread pool used for <<docs-index_,index>>/<<docs-delete,delete>> operations |`index` |`i` |Thread pool used for <<docs-index_,index>>/<<docs-delete,delete>> operations
|`management` |`ma` |Thread pool used for management of Elasticsearch (e.g. cluster management) |`management` |`ma` |Thread pool used for management of Elasticsearch (e.g. cluster management)
|`merge` |`m` |Thread pool used for <<index-modules-merge,merge>> operations
|`optimize` |`o` |Thread pool used for <<indices-optimize,optimize>> operations |`optimize` |`o` |Thread pool used for <<indices-optimize,optimize>> operations
|`percolate` |`p` |Thread pool used for <<search-percolate,percolator>> operations |`percolate` |`p` |Thread pool used for <<search-percolate,percolator>> operations
|`refresh` |`r` |Thread pool used for <<indices-refresh,refresh>> operations |`refresh` |`r` |Thread pool used for <<indices-refresh,refresh>> operations

View File

@ -35,8 +35,8 @@
- match: - match:
$body: | $body: |
/^ id \s+ ba \s+ fa \s+ gea \s+ ga \s+ ia \s+ maa \s+ ma \s+ oa \s+ pa \s+ \n /^ id \s+ ba \s+ fa \s+ gea \s+ ga \s+ ia \s+ maa \s+ oa \s+ pa \s+ \n
(\S+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ (\S+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/
- do: - do:
cat.thread_pool: cat.thread_pool:
@ -98,16 +98,6 @@
/^ id \s+ management.type \s+ management.active \s+ management.size \s+ management.queue \s+ management.queueSize \s+ management.rejected \s+ management.largest \s+ management.completed \s+ management.min \s+ management.max \s+ management.keepAlive \s+ \n /^ id \s+ management.type \s+ management.active \s+ management.size \s+ management.queue \s+ management.queueSize \s+ management.rejected \s+ management.largest \s+ management.completed \s+ management.min \s+ management.max \s+ management.keepAlive \s+ \n
(\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/
- do:
cat.thread_pool:
h: id,merge.type,merge.active,merge.size,merge.queue,merge.queueSize,merge.rejected,merge.largest,merge.completed,merge.min,merge.max,merge.keepAlive
v: true
- match:
$body: |
/^ id \s+ merge.type \s+ merge.active \s+ merge.size \s+ merge.queue \s+ merge.queueSize \s+ merge.rejected \s+ merge.largest \s+ merge.completed \s+ merge.min \s+ merge.max \s+ merge.keepAlive \s+ \n
(\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/
- do: - do:
cat.thread_pool: cat.thread_pool:
h: id,optimize.type,optimize.active,optimize.size,optimize.queue,optimize.queueSize,optimize.rejected,optimize.largest,optimize.completed,optimize.min,optimize.max,optimize.keepAlive h: id,optimize.type,optimize.active,optimize.size,optimize.queue,optimize.queueSize,optimize.rejected,optimize.largest,optimize.completed,optimize.min,optimize.max,optimize.keepAlive

View File

@ -1,70 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.merge;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergeScheduler;
import org.apache.lucene.index.MergeTrigger;
import java.io.IOException;
/**
* A wrapper of another {@link org.apache.lucene.index.MergeScheduler} that allows
* to explicitly enable merge and disable on a thread local basis. The default is
* to have merges disabled.
* <p/>
* This merge scheduler can be used to get around the fact that even though a merge
* policy can control that no new merges will be created as a result of a segment flush
* (during indexing operation for example), the {@link #merge(org.apache.lucene.index.IndexWriter, org.apache.lucene.index.MergeTrigger, boolean)}
* call will still be called, and can result in stalling indexing.
*/
public class EnableMergeScheduler extends MergeScheduler {
private final MergeScheduler mergeScheduler;
public EnableMergeScheduler(MergeScheduler mergeScheduler) {
this.mergeScheduler = mergeScheduler;
}
@Override
public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException {
if (trigger == MergeTrigger.EXPLICIT) {
mergeScheduler.merge(writer, trigger, newMergesFound);
}
}
@Override
public void close() throws IOException {
mergeScheduler.close();
}
@Override
public MergeScheduler clone() {
// Lucene IW makes a clone internally but since we hold on to this instance
// the clone will just be the identity.
return this;
}
@Override
public String toString() {
return "EnableMergeScheduler(" + mergeScheduler + ")";
}
}

View File

@ -71,7 +71,7 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
} }
@Override @Override
public MergeScheduler buildMergeScheduler() { public MergeScheduler newMergeScheduler() {
CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this); CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this);
// which would then stall if there are 2 merges in flight, and unstall once we are back to 1 or 0 merges // which would then stall if there are 2 merges in flight, and unstall once we are back to 1 or 0 merges
// NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows // NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.merge.scheduler;
import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeScheduler; import org.apache.lucene.index.MergeScheduler;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.merge.EnableMergeScheduler;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
@ -39,8 +38,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
*/ */
public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent { public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent {
public static final String FORCE_ASYNC_MERGE = "index.merge.force_async_merge";
public static interface FailureListener { public static interface FailureListener {
void onFailedMerge(MergePolicy.MergeException e); void onFailedMerge(MergePolicy.MergeException e);
} }
@ -113,19 +110,10 @@ public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent
} }
} }
public final MergeScheduler newMergeScheduler() {
MergeScheduler scheduler = buildMergeScheduler();
// an internal settings, that would allow us to disable this behavior if really needed
if (indexSettings.getAsBoolean(FORCE_ASYNC_MERGE, true)) {
scheduler = new EnableMergeScheduler(scheduler);
}
return scheduler;
}
/** Maximum number of allowed running merges before index throttling kicks in. */ /** Maximum number of allowed running merges before index throttling kicks in. */
public abstract int getMaxMerges(); public abstract int getMaxMerges();
protected abstract MergeScheduler buildMergeScheduler(); public abstract MergeScheduler newMergeScheduler();
public abstract MergeStats stats(); public abstract MergeStats stats();

View File

@ -162,7 +162,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private volatile IndexShardState state; private volatile IndexShardState state;
private TimeValue refreshInterval; private TimeValue refreshInterval;
private final TimeValue mergeInterval;
private volatile ScheduledFuture refreshScheduledFuture; private volatile ScheduledFuture refreshScheduledFuture;
private volatile ScheduledFuture mergeScheduleFuture; private volatile ScheduledFuture mergeScheduleFuture;
@ -211,7 +210,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
state = IndexShardState.CREATED; state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, engine.defaultRefreshInterval()); this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, engine.defaultRefreshInterval());
this.mergeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(1));
indexSettingsService.addListener(applyRefreshSettings); indexSettingsService.addListener(applyRefreshSettings);
@ -916,15 +914,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} else { } else {
logger.debug("scheduled refresher disabled"); logger.debug("scheduled refresher disabled");
} }
// since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually
// happen and reduce the number of segments
if (mergeInterval.millis() > 0) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger());
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
} else {
logger.debug("scheduled optimizer / merger disabled");
}
} }
private Query filterQueryIfNeeded(Query query, String[] types) { private Query filterQueryIfNeeded(Query query, String[] types) {
@ -1013,53 +1002,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
} }
} }
class EngineMerger implements Runnable {
@Override
public void run() {
if (!engine().possibleMergeNeeded()) {
synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this);
}
}
return;
}
threadPool.executor(ThreadPool.Names.MERGE).execute(new Runnable() {
@Override
public void run() {
try {
engine.maybeMerge();
} catch (EngineClosedException e) {
// we are being closed, ignore
} catch (OptimizeFailedEngineException e) {
if (e.getCause() instanceof EngineClosedException) {
// ignore, we are being shutdown
} else if (e.getCause() instanceof InterruptedException) {
// ignore, we are being shutdown
} else if (e.getCause() instanceof ClosedByInterruptException) {
// ignore, we are being shutdown
} else if (e.getCause() instanceof ThreadInterruptedException) {
// ignore, we are being shutdown
} else {
if (state != IndexShardState.CLOSED) {
logger.warn("Failed to perform scheduled engine optimize/merge", e);
}
}
} catch (Exception e) {
if (state != IndexShardState.CLOSED) {
logger.warn("Failed to perform scheduled engine optimize/merge", e);
}
}
synchronized (mutex) {
if (state != IndexShardState.CLOSED) {
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, EngineMerger.this);
}
}
}
});
}
}
private void checkIndex(boolean throwException) throws IndexShardException { private void checkIndex(boolean throwException) throws IndexShardException {
try { try {
checkIndexTook = 0; checkIndexTook = 0;

View File

@ -56,7 +56,6 @@ public class RestThreadPoolAction extends AbstractCatAction {
ThreadPool.Names.GET, ThreadPool.Names.GET,
ThreadPool.Names.INDEX, ThreadPool.Names.INDEX,
ThreadPool.Names.MANAGEMENT, ThreadPool.Names.MANAGEMENT,
ThreadPool.Names.MERGE,
ThreadPool.Names.OPTIMIZE, ThreadPool.Names.OPTIMIZE,
ThreadPool.Names.PERCOLATE, ThreadPool.Names.PERCOLATE,
ThreadPool.Names.REFRESH, ThreadPool.Names.REFRESH,
@ -73,17 +72,19 @@ public class RestThreadPoolAction extends AbstractCatAction {
"g", "g",
"i", "i",
"ma", "ma",
"m",
"o", "o",
"p", "p",
"r", "r",
"s", "s",
"sn", "sn",
"sd",
"su", "su",
"w" "w"
}; };
static {
assert SUPPORTED_ALIASES.length == SUPPORTED_NAMES.length: "SUPPORTED_NAMES/ALIASES mismatch";
}
private final static String[] DEFAULT_THREAD_POOLS = new String[]{ private final static String[] DEFAULT_THREAD_POOLS = new String[]{
ThreadPool.Names.BULK, ThreadPool.Names.BULK,
ThreadPool.Names.INDEX, ThreadPool.Names.INDEX,

View File

@ -74,7 +74,6 @@ public class ThreadPool extends AbstractComponent {
public static final String PERCOLATE = "percolate"; public static final String PERCOLATE = "percolate";
public static final String MANAGEMENT = "management"; public static final String MANAGEMENT = "management";
public static final String FLUSH = "flush"; public static final String FLUSH = "flush";
public static final String MERGE = "merge";
public static final String REFRESH = "refresh"; public static final String REFRESH = "refresh";
public static final String WARMER = "warmer"; public static final String WARMER = "warmer";
public static final String SNAPSHOT = "snapshot"; public static final String SNAPSHOT = "snapshot";
@ -123,7 +122,6 @@ public class ThreadPool extends AbstractComponent {
// the assumption here is that the listeners should be very lightweight on the listeners side // the assumption here is that the listeners should be very lightweight on the listeners side
.put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build()) .put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build())
.put(Names.FLUSH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.FLUSH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.put(Names.MERGE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build()) .put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build())
.put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())
.put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build())

View File

@ -513,9 +513,6 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, mergePolicy.getName()); builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, mergePolicy.getName());
} }
if (random.nextBoolean()) {
builder.put(MergeSchedulerProvider.FORCE_ASYNC_MERGE, random.nextBoolean());
}
switch (random.nextInt(4)) { switch (random.nextInt(4)) {
case 3: case 3:
builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class); builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class);

View File

@ -383,7 +383,7 @@ public final class InternalTestCluster extends TestCluster {
if (random.nextBoolean()) { if (random.nextBoolean()) {
// change threadpool types to make sure we don't have components that rely on the type of thread pools // change threadpool types to make sure we don't have components that rely on the type of thread pools
for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET, for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET,
ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.MERGE, ThreadPool.Names.OPTIMIZE, ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.OPTIMIZE,
ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT,
ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) {
if (random.nextBoolean()) { if (random.nextBoolean()) {