diff --git a/docs/reference/cat/thread_pool.asciidoc b/docs/reference/cat/thread_pool.asciidoc index acea7a95198..4372a761ffe 100644 --- a/docs/reference/cat/thread_pool.asciidoc +++ b/docs/reference/cat/thread_pool.asciidoc @@ -57,7 +57,6 @@ Currently available <>: |`get` |`g` |Thread pool used for <> operations |`index` |`i` |Thread pool used for <>/<> operations |`management` |`ma` |Thread pool used for management of Elasticsearch (e.g. cluster management) -|`merge` |`m` |Thread pool used for <> operations |`optimize` |`o` |Thread pool used for <> operations |`percolate` |`p` |Thread pool used for <> operations |`refresh` |`r` |Thread pool used for <> operations diff --git a/rest-api-spec/test/cat.thread_pool/10_basic.yaml b/rest-api-spec/test/cat.thread_pool/10_basic.yaml index c4ff37f889f..0c8ac5b4028 100755 --- a/rest-api-spec/test/cat.thread_pool/10_basic.yaml +++ b/rest-api-spec/test/cat.thread_pool/10_basic.yaml @@ -35,8 +35,8 @@ - match: $body: | - /^ id \s+ ba \s+ fa \s+ gea \s+ ga \s+ ia \s+ maa \s+ ma \s+ oa \s+ pa \s+ \n - (\S+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ + /^ id \s+ ba \s+ fa \s+ gea \s+ ga \s+ ia \s+ maa \s+ oa \s+ pa \s+ \n + (\S+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d+ \s+ \n)+ $/ - do: cat.thread_pool: @@ -98,16 +98,6 @@ /^ id \s+ management.type \s+ management.active \s+ management.size \s+ management.queue \s+ management.queueSize \s+ management.rejected \s+ management.largest \s+ management.completed \s+ management.min \s+ management.max \s+ management.keepAlive \s+ \n (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - do: - cat.thread_pool: - h: id,merge.type,merge.active,merge.size,merge.queue,merge.queueSize,merge.rejected,merge.largest,merge.completed,merge.min,merge.max,merge.keepAlive - v: true - - - match: - $body: | - /^ id \s+ merge.type \s+ merge.active \s+ merge.size \s+ merge.queue \s+ merge.queueSize \s+ merge.rejected \s+ merge.largest \s+ merge.completed \s+ merge.min \s+ merge.max \s+ merge.keepAlive \s+ \n - (\S+ \s+ (cached|fixed|scaling)? \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d+ \s+ \d+ \s+ \d+ \s+ \d* \s+ \d* \s+ \S* \s+ \n)+ $/ - - do: cat.thread_pool: h: id,optimize.type,optimize.active,optimize.size,optimize.queue,optimize.queueSize,optimize.rejected,optimize.largest,optimize.completed,optimize.min,optimize.max,optimize.keepAlive diff --git a/src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java b/src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java deleted file mode 100644 index 28a15558772..00000000000 --- a/src/main/java/org/elasticsearch/index/merge/EnableMergeScheduler.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.merge; - -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.MergeScheduler; -import org.apache.lucene.index.MergeTrigger; - -import java.io.IOException; - -/** - * A wrapper of another {@link org.apache.lucene.index.MergeScheduler} that allows - * to explicitly enable merge and disable on a thread local basis. The default is - * to have merges disabled. - *

- * This merge scheduler can be used to get around the fact that even though a merge - * policy can control that no new merges will be created as a result of a segment flush - * (during indexing operation for example), the {@link #merge(org.apache.lucene.index.IndexWriter, org.apache.lucene.index.MergeTrigger, boolean)} - * call will still be called, and can result in stalling indexing. - */ -public class EnableMergeScheduler extends MergeScheduler { - - private final MergeScheduler mergeScheduler; - - public EnableMergeScheduler(MergeScheduler mergeScheduler) { - this.mergeScheduler = mergeScheduler; - } - - - @Override - public void merge(IndexWriter writer, MergeTrigger trigger, boolean newMergesFound) throws IOException { - if (trigger == MergeTrigger.EXPLICIT) { - mergeScheduler.merge(writer, trigger, newMergesFound); - } - } - - @Override - public void close() throws IOException { - mergeScheduler.close(); - } - - @Override - public MergeScheduler clone() { - // Lucene IW makes a clone internally but since we hold on to this instance - // the clone will just be the identity. - return this; - } - - @Override - public String toString() { - return "EnableMergeScheduler(" + mergeScheduler + ")"; - } -} diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java index 625099ca692..c73c4de4098 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java @@ -71,7 +71,7 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { } @Override - public MergeScheduler buildMergeScheduler() { + public MergeScheduler newMergeScheduler() { CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this); // which would then stall if there are 2 merges in flight, and unstall once we are back to 1 or 0 merges // NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java index 86868ded3cd..347b2ba32f1 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.merge.scheduler; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.MergeScheduler; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.merge.EnableMergeScheduler; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.settings.IndexSettings; @@ -39,8 +38,6 @@ import java.util.concurrent.CopyOnWriteArrayList; */ public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent { - public static final String FORCE_ASYNC_MERGE = "index.merge.force_async_merge"; - public static interface FailureListener { void onFailedMerge(MergePolicy.MergeException e); } @@ -113,19 +110,10 @@ public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent } } - public final MergeScheduler newMergeScheduler() { - MergeScheduler scheduler = buildMergeScheduler(); - // an internal settings, that would allow us to disable this behavior if really needed - if (indexSettings.getAsBoolean(FORCE_ASYNC_MERGE, true)) { - scheduler = new EnableMergeScheduler(scheduler); - } - return scheduler; - } - /** Maximum number of allowed running merges before index throttling kicks in. */ public abstract int getMaxMerges(); - protected abstract MergeScheduler buildMergeScheduler(); + public abstract MergeScheduler newMergeScheduler(); public abstract MergeStats stats(); diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index cbf50b99c5b..0d7be7b5556 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -162,7 +162,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private volatile IndexShardState state; private TimeValue refreshInterval; - private final TimeValue mergeInterval; private volatile ScheduledFuture refreshScheduledFuture; private volatile ScheduledFuture mergeScheduleFuture; @@ -211,7 +210,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, engine.defaultRefreshInterval()); - this.mergeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(1)); indexSettingsService.addListener(applyRefreshSettings); @@ -916,15 +914,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } else { logger.debug("scheduled refresher disabled"); } - // since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing - // so, make sure we periodically call it, this need to be a small enough value so mergine will actually - // happen and reduce the number of segments - if (mergeInterval.millis() > 0) { - mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, new EngineMerger()); - logger.debug("scheduling optimizer / merger every {}", mergeInterval); - } else { - logger.debug("scheduled optimizer / merger disabled"); - } } private Query filterQueryIfNeeded(Query query, String[] types) { @@ -1013,53 +1002,6 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I } } - class EngineMerger implements Runnable { - @Override - public void run() { - if (!engine().possibleMergeNeeded()) { - synchronized (mutex) { - if (state != IndexShardState.CLOSED) { - mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, this); - } - } - return; - } - threadPool.executor(ThreadPool.Names.MERGE).execute(new Runnable() { - @Override - public void run() { - try { - engine.maybeMerge(); - } catch (EngineClosedException e) { - // we are being closed, ignore - } catch (OptimizeFailedEngineException e) { - if (e.getCause() instanceof EngineClosedException) { - // ignore, we are being shutdown - } else if (e.getCause() instanceof InterruptedException) { - // ignore, we are being shutdown - } else if (e.getCause() instanceof ClosedByInterruptException) { - // ignore, we are being shutdown - } else if (e.getCause() instanceof ThreadInterruptedException) { - // ignore, we are being shutdown - } else { - if (state != IndexShardState.CLOSED) { - logger.warn("Failed to perform scheduled engine optimize/merge", e); - } - } - } catch (Exception e) { - if (state != IndexShardState.CLOSED) { - logger.warn("Failed to perform scheduled engine optimize/merge", e); - } - } - synchronized (mutex) { - if (state != IndexShardState.CLOSED) { - mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.SAME, EngineMerger.this); - } - } - } - }); - } - } - private void checkIndex(boolean throwException) throws IndexShardException { try { checkIndexTook = 0; diff --git a/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java index bc423ce90bd..cdde7a109eb 100644 --- a/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java +++ b/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -56,7 +56,6 @@ public class RestThreadPoolAction extends AbstractCatAction { ThreadPool.Names.GET, ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, - ThreadPool.Names.MERGE, ThreadPool.Names.OPTIMIZE, ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, @@ -73,17 +72,19 @@ public class RestThreadPoolAction extends AbstractCatAction { "g", "i", "ma", - "m", "o", "p", "r", "s", "sn", - "sd", "su", "w" }; + static { + assert SUPPORTED_ALIASES.length == SUPPORTED_NAMES.length: "SUPPORTED_NAMES/ALIASES mismatch"; + } + private final static String[] DEFAULT_THREAD_POOLS = new String[]{ ThreadPool.Names.BULK, ThreadPool.Names.INDEX, diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 13450d80d1b..2bad19b1713 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -74,7 +74,6 @@ public class ThreadPool extends AbstractComponent { public static final String PERCOLATE = "percolate"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; - public static final String MERGE = "merge"; public static final String REFRESH = "refresh"; public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; @@ -123,7 +122,6 @@ public class ThreadPool extends AbstractComponent { // the assumption here is that the listeners should be very lightweight on the listeners side .put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build()) .put(Names.FLUSH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) - .put(Names.MERGE, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.REFRESH, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build()) .put(Names.WARMER, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) .put(Names.SNAPSHOT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()) diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index e283ecffa7f..d82dd98f028 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -513,9 +513,6 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase builder.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, mergePolicy.getName()); } - if (random.nextBoolean()) { - builder.put(MergeSchedulerProvider.FORCE_ASYNC_MERGE, random.nextBoolean()); - } switch (random.nextInt(4)) { case 3: builder.put(MergeSchedulerModule.MERGE_SCHEDULER_TYPE_KEY, ConcurrentMergeSchedulerProvider.class); diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index c441d6e1bc7..30b8948815e 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -383,7 +383,7 @@ public final class InternalTestCluster extends TestCluster { if (random.nextBoolean()) { // change threadpool types to make sure we don't have components that rely on the type of thread pools for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET, - ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.MERGE, ThreadPool.Names.OPTIMIZE, + ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.OPTIMIZE, ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { if (random.nextBoolean()) {