From 856b294441edcc73ca8aa2e472c8c0094b6822b9 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Tue, 25 Nov 2014 04:13:57 -0500 Subject: [PATCH] Core: let Lucene kick off merges Today, Elasticsearch has a separate merge thread pool checking once per second (by default) if any merges are necessary, but this is no longer necessary since we can and do now tell Lucene's ConcurrentMergeScheduler never to "hard pause" threads when merges fall behind, since we do our own index throttling. This change goes back to letting Lucene launch merges as needed, and removes these two expert settings: index.merge.force_async_merge index.merge.async_interval Now merges kick off immediately instead of waiting up to 1 second before running. Closes #8643 --- docs/reference/cat/thread_pool.asciidoc | 1 - .../test/cat.thread_pool/10_basic.yaml | 14 +--- .../index/merge/EnableMergeScheduler.java | 70 ------------------- .../ConcurrentMergeSchedulerProvider.java | 2 +- .../scheduler/MergeSchedulerProvider.java | 14 +--- .../shard/service/InternalIndexShard.java | 58 --------------- .../rest/action/cat/RestThreadPoolAction.java | 7 +- .../elasticsearch/threadpool/ThreadPool.java | 2 - .../test/ElasticsearchIntegrationTest.java | 3 - .../test/InternalTestCluster.java | 2 +- 10 files changed, 9 insertions(+), 164 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java diff --git a/docs/reference/cat/thread_pool.asciidoc b/docs/reference/cat/thread_pool.asciidoc index acea7a95198..4372a761ffe 100644 --- a/docs/reference/cat/thread_pool.asciidoc +++ b/docs/reference/cat/thread_pool.asciidoc @@ -57,7 +57,6 @@ Currently available <>: |`get` |`g` |Thread pool used for <> operations |`index` |`i` |Thread pool used for <>/<> operations |`management` |`ma` |Thread pool used for management of Elasticsearch (e.g. cluster management) -|`merge` |`m` |Thread pool used for <> operations |`optimize` |`o` |Thread pool used for <> operations |`percolate` |`p` |Thread pool used for <> operations |`refresh` |`r` |Thread pool used for <> operations diff --git a/rest-api-spec/test/cat.thread_pool/10_basic.yaml b/rest-api-spec/test/cat.thread_pool/10_basic.yaml index c4ff37f889f..0c8ac5b4028 100755 --- a/rest-api-spec/test/cat.thread_pool/10_basic.yaml +++ b/rest-api-spec/test/cat.thread_pool/10_basic.yaml @@ -35,8 +35,8 @@ - match: $body: | - /^ id \s+ ba \s+ fa \s+ gea \s+ ga \s+ ia \s+ maa \s+ ma \s+ oa \s+ pa \s+ \n - (\S+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ + /^ id \s+ ba \s+ fa \s+ gea \s+ ga \s+ ia \s+ maa \s+ oa \s+ pa \s+ \n + (\S+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ - do: cat.thread_pool: @@ -98,16 +98,6 @@ /^ id \s+ management.type \s+ management.active \s+ management.size \s+ management.queue \s+ management.queueSize \s+ management.rejected \s+ management.largest \s+ management.completed \s+ management.min \s+ management.max \s+ management.keepAlive \s+ \n (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - do: - cat.thread_pool: - h: id,merge.type,merge.active,merge.size,merge.queue,merge.queueSize,merge.rejected,merge.largest,merge.completed,merge.min,merge.max,merge.keepAlive - v: true - - - match: - $body: | - /^ id \s+ merge.type \s+ merge.active \s+ merge.size \s+ merge.queue \s+ merge.queueSize \s+ merge.rejected \s+ merge.largest \s+ merge.completed \s+ merge.min \s+ merge.max \s+ merge.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - do: cat.thread_pool: h: id,optimize.type,optimize.active,optimize.size,optimize.queue,optimize.queueSize,optimize.rejected,optimize.largest,optimize.completed,optimize.min,optimize.max,optimize.keepAlive diff --git a/src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java b/src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java deleted file mode 100644 index 28a15558772..00000000000 --- a/src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.merge; - -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.MergeScheduler; -import org.apache.lucene.index.MergeTrigger; - -import java.io.IOException; - -/** - * A wrapper of another {@link org.apache.lucene.index.MergeScheduler} that allows - * to explicitly enable merge and disable on a thread local basis. The default is - * to have merges disabled. - *

- * This merge scheduler can be used to get around the fact that even though a merge - * policy can control that no new merges will be created as a result of a segment flush - * (during indexing operation for example), the {@link #merge(org.apache.lucene.index.IndexWriter, org.apache.lucene.index.MergeTrigger, boolean)} - * call will still be called, and can result in stalling indexing. - */ -public class EnableMergeScheduler extends MergeScheduler { - - private final MergeScheduler mergeScheduler; - - public EnableMergeScheduler(MergeScheduler mergeScheduler) { - this.mergeScheduler = mergeScheduler; - } - - - @Override - public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { - if (trigger == MergeTrigger.EXPLICIT) { - mergeScheduler.merge(writer, trigger, newMergesFound); - } - } - - @Override - public void close() throws IOException { - mergeScheduler.close(); - } - - @Override - public MergeScheduler clone() { - // Lucene IW makes a clone internally but since we hold on to this instance - // the clone will just be the identity. - return this; - } - - @Override - public String toString() { - return "EnableMergeScheduler(" + mergeScheduler + ")"; - } -} diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java index 625099ca692..c73c4de4098 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java @@ -71,7 +71,7 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { } @Override - public MergeScheduler buildMergeScheduler() { + public MergeScheduler newMergeScheduler() { CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this); // which would then stall if there are 2 merges in flight, and unstall once we are back to 1 or 0 merges // NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java index 86868ded3cd..347b2ba32f1 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.merge.scheduler; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.MergeScheduler; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.merge.EnableMergeScheduler; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.settings.IndexSettings; @@ -39,8 +38,6 @@ import java.util.concurrent.CopyOnWriteArrayList; */ public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent { - public static final String FORCE_ASYNC_MERGE = "index.merge.force_async_merge"; - public static interface FailureListener { void onFailedMerge(MergePolicy.MergeException e); } @@ -113,19 +110,10 @@ public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent } } - public final MergeScheduler newMergeScheduler() { - MergeScheduler scheduler = buildMergeScheduler(); - // an internal settings, that would allow us to disable this behavior if really needed - if (indexSettings.getAsBoolean(FORCE_ASYNC_MERGE, true)) { - scheduler = new EnableMergeScheduler(scheduler); - } - return scheduler; - } - /** Maximum number of allowed running merges before index throttling kicks in. */ public abstract int getMaxMerges(); - protected abstract MergeScheduler buildMergeScheduler(); + public abstract MergeScheduler newMergeScheduler(); public abstract MergeStats stats(); diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index cbf50b99c5b..0d7be7b5556 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -162,7 +162,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private volatile IndexShardState state; private TimeValue refreshInterval; - private final TimeValue mergeInterval; private volatile ScheduledFuture refreshScheduledFuture; private volatile ScheduledFuture mergeScheduleFuture; @@ -211,7 +210,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, engine.defaultRefreshInterval()); - this.mergeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(1)); indexSettingsService.addListener(applyRefreshSettings); @@ -916,15 +914,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } else { logger.debug("scheduled refresher disabled"); } - // since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing - // so, make sure we periodically call it, this need to be a small enough value so mergine will actually - // happen and reduce the number of segments - if (mergeInterval.millis() > 0) { - mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger()); - logger.debug("scheduling optimizer / merger every {}", mergeInterval); - } else { - logger.debug("scheduled optimizer / merger disabled"); - } } private Query filterQueryIfNeeded(Query query, String[] types) { @@ -1013,53 +1002,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } } - class EngineMerger implements Runnable { - @Override - public void run() { - if (!engine().possibleMergeNeeded()) { - synchronized (mutex) { - if (state != IndexShardState.CLOSED) { - mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this); - } - } - return; - } - threadPool.executor(ThreadPool.Names.MERGE).execute(new Runnable() { - @Override - public void run() { - try { - engine.maybeMerge(); - } catch (EngineClosedException e) { - // we are being closed, ignore - } catch (OptimizeFailedEngineException e) { - if (e.getCause() instanceof EngineClosedException) { - // ignore, we are being shutdown - } else if (e.getCause() instanceof InterruptedException) { - // ignore, we are being shutdown - } else if (e.getCause() instanceof ClosedByInterruptException) { - // ignore, we are being shutdown - } else if (e.getCause() instanceof ThreadInterruptedException) { - // ignore, we are being shutdown - } else { - if (state != IndexShardState.CLOSED) { - logger.warn("Failed to perform scheduled engine optimize/merge", e); - } - } - } catch (Exception e) { - if (state != IndexShardState.CLOSED) { - logger.warn("Failed to perform scheduled engine optimize/merge", e); - } - } - synchronized (mutex) { - if (state != IndexShardState.CLOSED) { - mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, EngineMerger.this); - } - } - } - }); - } - } - private void checkIndex(boolean throwException) throws IndexShardException { try { checkIndexTook = 0; diff --git a/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java index bc423ce90bd..cdde7a109eb 100644 --- a/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java +++ b/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -56,7 +56,6 @@ public class RestThreadPoolAction extends AbstractCatAction { ThreadPool.Names.GET, ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, - ThreadPool.Names.MERGE, ThreadPool.Names.OPTIMIZE, ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, @@ -73,17 +72,19 @@ public class RestThreadPoolAction extends AbstractCatAction { "g", "i", "ma", - "m", "o", "p", "r", "s", "sn", - "sd", "su", "w" }; + static { + assert SUPPORTED_ALIASES.length == SUPPORTED_NAMES.length: "SUPPORTED_NAMES/ALIASES mismatch"; + } + private final static String[] DEFAULT_THREAD_POOLS = new String[]{ ThreadPool.Names.BULK, ThreadPool.Names.INDEX, diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 13450d80d1b..2bad19b1713 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -74,7 +74,6 @@ public class ThreadPool extends AbstractComponent { public static final String PERCOLATE = "percolate"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; - public static final String MERGE = "merge"; public static final String REFRESH = "refresh"; public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; @@ -123,7 +122,6 @@ public class ThreadPool extends AbstractComponent { // the assumption here is that the listeners should be very lightweight on the listeners side .put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build()) .put(Names.FLUSH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) - .put(Names.MERGE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build()) .put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index e283ecffa7f..d82dd98f028 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -513,9 +513,6 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, mergePolicy.getName()); } - if (random.nextBoolean()) { - builder.put(MergeSchedulerProvider.FORCE_ASYNC_MERGE, random.nextBoolean()); - } switch (random.nextInt(4)) { case 3: builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class); diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index c441d6e1bc7..30b8948815e 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -383,7 +383,7 @@ public final class InternalTestCluster extends TestCluster { if (random.nextBoolean()) { // change threadpool types to make sure we don't have components that rely on the type of thread pools for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET, - ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.MERGE, ThreadPool.Names.OPTIMIZE, + ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.OPTIMIZE, ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { if (random.nextBoolean()) {