diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index 60a4913bb5e..31f5eb4a921 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -177,7 +177,7 @@ public class ClusterModule extends AbstractModule { registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE); - registerClusterDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*", Validator.EMPTY); + registerClusterDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*", ThreadPool.THREAD_POOL_TYPE_SETTINGS_VALIDATOR); registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER); registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, Validator.INTEGER); registerClusterDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, Validator.EMPTY); diff --git a/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java b/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java index 311c4eca2cc..2ad9defe2a7 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/cat/RestThreadPoolAction.java @@ -288,7 +288,7 @@ public class RestThreadPoolAction extends AbstractCatAction { } } - table.addCell(poolInfo == null ? null : poolInfo.getType()); + table.addCell(poolInfo == null ? null : poolInfo.getThreadPoolType()); table.addCell(poolStats == null ? null : poolStats.getActive()); table.addCell(poolStats == null ? null : poolStats.getThreads()); table.addCell(poolStats == null ? null : poolStats.getQueue()); diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 039b46b5c7a..b0d81279b03 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -20,6 +20,8 @@ package org.elasticsearch.threadpool; import org.apache.lucene.util.Counter; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.settings.Validator; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -39,22 +41,11 @@ import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.node.settings.NodeSettingsService; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -86,6 +77,101 @@ public class ThreadPool extends AbstractComponent { public static final String FETCH_SHARD_STORE = "fetch_shard_store"; } + public enum ThreadPoolType { + CACHED("cached"), + DIRECT("direct"), + FIXED("fixed"), + SCALING("scaling"); + + private final String type; + + public String getType() { + return type; + } + + ThreadPoolType(String type) { + this.type = type; + } + + private final static Map TYPE_MAP; + + static { + Map typeMap = new HashMap<>(); + for (ThreadPoolType threadPoolType : ThreadPoolType.values()) { + typeMap.put(threadPoolType.getType(), threadPoolType); + } + TYPE_MAP = Collections.unmodifiableMap(typeMap); + } + + public static ThreadPoolType fromType(String type) { + ThreadPoolType threadPoolType = TYPE_MAP.get(type); + if (threadPoolType == null) { + throw new IllegalArgumentException("no ThreadPoolType for " + type); + } + return threadPoolType; + } + } + + public static Map THREAD_POOL_TYPES; + + static { + HashMap map = new HashMap<>(); + map.put(Names.SAME, ThreadPoolType.DIRECT); + map.put(Names.GENERIC, ThreadPoolType.CACHED); + map.put(Names.LISTENER, ThreadPoolType.FIXED); + map.put(Names.GET, ThreadPoolType.FIXED); + map.put(Names.INDEX, ThreadPoolType.FIXED); + map.put(Names.BULK, ThreadPoolType.FIXED); + map.put(Names.SEARCH, ThreadPoolType.FIXED); + map.put(Names.SUGGEST, ThreadPoolType.FIXED); + map.put(Names.PERCOLATE, ThreadPoolType.FIXED); + map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); + map.put(Names.FLUSH, ThreadPoolType.SCALING); + map.put(Names.REFRESH, ThreadPoolType.SCALING); + map.put(Names.WARMER, ThreadPoolType.SCALING); + map.put(Names.SNAPSHOT, ThreadPoolType.SCALING); + map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); + map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); + map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); + THREAD_POOL_TYPES = Collections.unmodifiableMap(map); + } + + private static void add(Map executorSettings, ExecutorSettingsBuilder builder) { + Settings settings = builder.build(); + String name = settings.get("name"); + executorSettings.put(name, settings); + } + + private static class ExecutorSettingsBuilder { + Map settings = new HashMap<>(); + + public ExecutorSettingsBuilder(String name) { + settings.put("name", name); + settings.put("type", THREAD_POOL_TYPES.get(name).getType()); + } + + public ExecutorSettingsBuilder size(int availableProcessors) { + return add("size", Integer.toString(availableProcessors)); + } + + public ExecutorSettingsBuilder queueSize(int queueSize) { + return add("queue_size", Integer.toString(queueSize)); + } + + public ExecutorSettingsBuilder keepAlive(String keepAlive) { + return add("keep_alive", keepAlive); + } + + private ExecutorSettingsBuilder add(String key, String value) { + settings.put(key, value); + return this; + } + + public Settings build() { + return settingsBuilder().put(settings).build(); + } + } + public static final String THREADPOOL_GROUP = "threadpool."; private volatile Map executors; @@ -102,7 +188,6 @@ public class ThreadPool extends AbstractComponent { static final Executor DIRECT_EXECUTOR = command -> command.run(); - public ThreadPool(String name) { this(Settings.builder().put("name", name).build()); } @@ -112,42 +197,31 @@ public class ThreadPool extends AbstractComponent { assert settings.get("name") != null : "ThreadPool's settings should contain a name"; - Map groupSettings = settings.getGroups(THREADPOOL_GROUP); + Map groupSettings = getThreadPoolSettingsGroup(settings); int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5); int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10); Map defaultExecutorTypeSettings = new HashMap<>(); - defaultExecutorTypeSettings.put(Names.GENERIC, settingsBuilder().put("type", "cached").put("keep_alive", "30s").build()); - defaultExecutorTypeSettings.put(Names.INDEX, - settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 200).build()); - defaultExecutorTypeSettings.put(Names.BULK, - settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 50).build()); - defaultExecutorTypeSettings.put(Names.GET, - settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()); - defaultExecutorTypeSettings.put(Names.SEARCH, - settingsBuilder().put("type", "fixed").put("size", ((availableProcessors * 3) / 2) + 1).put("queue_size", 1000).build()); - defaultExecutorTypeSettings.put(Names.SUGGEST, - settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()); - defaultExecutorTypeSettings.put(Names.PERCOLATE, - settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build()); - defaultExecutorTypeSettings .put(Names.MANAGEMENT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build()); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GENERIC).keepAlive("30s")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SUGGEST).size(availableProcessors).queueSize(1000)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.PERCOLATE).size(availableProcessors).queueSize(1000)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.MANAGEMENT).size(5).keepAlive("5m")); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side - defaultExecutorTypeSettings.put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build()); - defaultExecutorTypeSettings.put(Names.FLUSH, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()); - defaultExecutorTypeSettings.put(Names.REFRESH, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build()); - defaultExecutorTypeSettings.put(Names.WARMER, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()); - defaultExecutorTypeSettings.put(Names.SNAPSHOT, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build()); - defaultExecutorTypeSettings.put(Names.FORCE_MERGE, settingsBuilder().put("type", "fixed").put("size", 1).build()); - defaultExecutorTypeSettings.put(Names.FETCH_SHARD_STARTED, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build()); - defaultExecutorTypeSettings.put(Names.FETCH_SHARD_STORE, - settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build()); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FLUSH).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.REFRESH).size(halfProcMaxAt10).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.WARMER).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SNAPSHOT).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1)); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m")); + this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings); Map executors = new HashMap<>(); @@ -163,8 +237,8 @@ public class ThreadPool extends AbstractComponent { executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), Settings.EMPTY)); } - executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, "same"))); - if (!executors.get(Names.GENERIC).info.getType().equals("cached")) { + executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); + if (!executors.get(Names.GENERIC).info.getThreadPoolType().equals(ThreadPoolType.CACHED)) { throw new IllegalArgumentException("generic thread pool must be of type cached"); } this.executors = unmodifiableMap(executors); @@ -178,6 +252,12 @@ public class ThreadPool extends AbstractComponent { this.estimatedTimeThread.start(); } + private Map getThreadPoolSettingsGroup(Settings settings) { + Map groupSettings = settings.getGroups(THREADPOOL_GROUP); + validate(groupSettings); + return groupSettings; + } + public void setNodeSettingsService(NodeSettingsService nodeSettingsService) { if(settingsListenerIsSet) { throw new IllegalStateException("the node settings listener was set more then once"); @@ -326,24 +406,28 @@ public class ThreadPool extends AbstractComponent { settings = Settings.Builder.EMPTY_SETTINGS; } Info previousInfo = previousExecutorHolder != null ? previousExecutorHolder.info : null; - String type = settings.get("type", previousInfo != null ? previousInfo.getType() : defaultSettings.get("type")); + String type = settings.get("type", previousInfo != null ? previousInfo.getThreadPoolType().getType() : defaultSettings.get("type")); + ThreadPoolType threadPoolType = ThreadPoolType.fromType(type); ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name); - if ("same".equals(type)) { + if (ThreadPoolType.DIRECT == threadPoolType) { if (previousExecutorHolder != null) { logger.debug("updating thread_pool [{}], type [{}]", name, type); } else { logger.debug("creating thread_pool [{}], type [{}]", name, type); } - return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, type)); - } else if ("cached".equals(type)) { + return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, threadPoolType)); + } else if (ThreadPoolType.CACHED == threadPoolType) { + if (!Names.GENERIC.equals(name)) { + throw new IllegalArgumentException("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]"); + } TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); if (previousExecutorHolder != null) { - if ("cached".equals(previousInfo.getType())) { + if (ThreadPoolType.CACHED == previousInfo.getThreadPoolType()) { TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); - return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, -1, -1, updatedKeepAlive, null)); + return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, -1, -1, updatedKeepAlive, null)); } return previousExecutorHolder; } @@ -358,13 +442,13 @@ public class ThreadPool extends AbstractComponent { logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); } Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); - return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); - } else if ("fixed".equals(type)) { + return new ExecutorHolder(executor, new Info(name, threadPoolType, -1, -1, keepAlive, null)); + } else if (ThreadPoolType.FIXED == threadPoolType) { int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null)); if (previousExecutorHolder != null) { - if ("fixed".equals(previousInfo.getType())) { + if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) { SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize()))); if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) { int updatedSize = settings.getAsInt("size", previousInfo.getMax()); @@ -378,7 +462,7 @@ public class ThreadPool extends AbstractComponent { ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedSize); ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize); } - return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize)); + return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, updatedSize, updatedSize, null, updatedQueueSize)); } return previousExecutorHolder; } @@ -393,13 +477,13 @@ public class ThreadPool extends AbstractComponent { SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); - return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize)); - } else if ("scaling".equals(type)) { + return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize)); + } else if (ThreadPoolType.SCALING == threadPoolType) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); int defaultMin = defaultSettings.getAsInt("min", 1); int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); if (previousExecutorHolder != null) { - if ("scaling".equals(previousInfo.getType())) { + if (ThreadPoolType.SCALING == previousInfo.getThreadPoolType()) { TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); int updatedMin = settings.getAsInt("min", previousInfo.getMin()); int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax())); @@ -414,7 +498,7 @@ public class ThreadPool extends AbstractComponent { if (previousInfo.getMax() != updatedSize) { ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize); } - return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, null)); + return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, updatedMin, updatedSize, updatedKeepAlive, null)); } return previousExecutorHolder; } @@ -437,13 +521,13 @@ public class ThreadPool extends AbstractComponent { logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); } Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); - return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null)); + return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null)); } throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]"); } public void updateSettings(Settings settings) { - Map groupSettings = settings.getGroups("threadpool"); + Map groupSettings = getThreadPoolSettingsGroup(settings); if (groupSettings.isEmpty()) { return; } @@ -490,6 +574,20 @@ public class ThreadPool extends AbstractComponent { } } + private void validate(Map groupSettings) { + for (String key : groupSettings.keySet()) { + if (!THREAD_POOL_TYPES.containsKey(key)) { + continue; + } + String type = groupSettings.get(key).get("type"); + ThreadPoolType correctThreadPoolType = THREAD_POOL_TYPES.get(key); + // TODO: the type equality check can be removed after #3760/#6732 are addressed + if (type != null && !correctThreadPoolType.getType().equals(type)) { + throw new IllegalArgumentException("setting " + THREADPOOL_GROUP + key + ".type to " + type + " is not permitted; must be " + correctThreadPoolType.getType()); + } + } + } + /** * A thread pool size can also be unbounded and is represented by -1, which is not supported by SizeValue (which only supports positive numbers) */ @@ -643,7 +741,7 @@ public class ThreadPool extends AbstractComponent { public static class Info implements Streamable, ToXContent { private String name; - private String type; + private ThreadPoolType type; private int min; private int max; private TimeValue keepAlive; @@ -653,15 +751,15 @@ public class ThreadPool extends AbstractComponent { } - public Info(String name, String type) { + public Info(String name, ThreadPoolType type) { this(name, type, -1); } - public Info(String name, String type, int size) { + public Info(String name, ThreadPoolType type, int size) { this(name, type, size, size, null, null); } - public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) { + public Info(String name, ThreadPoolType type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) { this.name = name; this.type = type; this.min = min; @@ -674,7 +772,7 @@ public class ThreadPool extends AbstractComponent { return this.name; } - public String getType() { + public ThreadPoolType getThreadPoolType() { return this.type; } @@ -699,7 +797,7 @@ public class ThreadPool extends AbstractComponent { @Override public void readFrom(StreamInput in) throws IOException { name = in.readString(); - type = in.readString(); + type = ThreadPoolType.fromType(in.readString()); min = in.readInt(); max = in.readInt(); if (in.readBoolean()) { @@ -716,7 +814,7 @@ public class ThreadPool extends AbstractComponent { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(name); - out.writeString(type); + out.writeString(type.getType()); out.writeInt(min); out.writeInt(max); if (keepAlive == null) { @@ -739,7 +837,7 @@ public class ThreadPool extends AbstractComponent { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(name, XContentBuilder.FieldCaseConversion.NONE); - builder.field(Fields.TYPE, type); + builder.field(Fields.TYPE, type.getType()); if (min != -1) { builder.field(Fields.MIN, min); } @@ -814,4 +912,37 @@ public class ThreadPool extends AbstractComponent { return false; } + public static ThreadPoolTypeSettingsValidator THREAD_POOL_TYPE_SETTINGS_VALIDATOR = new ThreadPoolTypeSettingsValidator(); + private static class ThreadPoolTypeSettingsValidator implements Validator { + @Override + public String validate(String setting, String value, ClusterState clusterState) { + // TODO: the type equality validation can be removed after #3760/#6732 are addressed + Matcher matcher = Pattern.compile("threadpool\\.(.*)\\.type").matcher(setting); + if (!matcher.matches()) { + return null; + } else { + String threadPool = matcher.group(1); + ThreadPool.ThreadPoolType defaultThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPool); + ThreadPool.ThreadPoolType threadPoolType; + try { + threadPoolType = ThreadPool.ThreadPoolType.fromType(value); + } catch (IllegalArgumentException e) { + return e.getMessage(); + } + if (defaultThreadPoolType.equals(threadPoolType)) { + return null; + } else { + return String.format( + Locale.ROOT, + "thread pool type for [%s] can only be updated to [%s] but was [%s]", + threadPool, + defaultThreadPoolType.getType(), + threadPoolType.getType() + ); + } + } + + } + } + } diff --git a/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java b/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java index a8c6a194c56..d3e3de5fda1 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java +++ b/core/src/test/java/org/elasticsearch/search/SearchWithRejectionsIT.java @@ -37,7 +37,6 @@ public class SearchWithRejectionsIT extends ESIntegTestCase { @Override public Settings nodeSettings(int nodeOrdinal) { return settingsBuilder().put(super.nodeSettings(nodeOrdinal)) - .put("threadpool.search.type", "fixed") .put("threadpool.search.size", 1) .put("threadpool.search.queue_size", 1) .build(); diff --git a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java index d52f67dc82c..838c2a6d401 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java +++ b/core/src/test/java/org/elasticsearch/threadpool/SimpleThreadPoolIT.java @@ -46,20 +46,13 @@ import java.lang.management.ThreadMXBean; import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.regex.Pattern; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.*; /** */ @@ -67,7 +60,7 @@ import static org.hamcrest.Matchers.sameInstance; public class SimpleThreadPoolIT extends ESIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "cached").build(); + return Settings.settingsBuilder().build(); } public void testThreadNames() throws Exception { @@ -130,26 +123,23 @@ public class SimpleThreadPoolIT extends ESIntegTestCase { internalCluster().startNodesAsync(2).get(); ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class); // Check that settings are changed - assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L)); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.keep_alive", "10m").build()).execute().actionGet(); - assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(1000)); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 2000).build()).execute().actionGet(); + assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(2000)); // Make sure that threads continue executing when executor is replaced final CyclicBarrier barrier = new CyclicBarrier(2); Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.executor(Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - barrier.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } catch (BrokenBarrierException ex) { - // - } - } - }); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet(); + threadPool.executor(Names.SEARCH).execute(() -> { + try { + barrier.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } catch (BrokenBarrierException ex) { + // + } + }); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 1000).build()).execute().actionGet(); assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true)); assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true)); @@ -157,24 +147,19 @@ public class SimpleThreadPoolIT extends ESIntegTestCase { barrier.await(10, TimeUnit.SECONDS); // Make sure that new thread executor is functional - threadPool.executor(Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - barrier.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } catch (BrokenBarrierException ex) { - // + threadPool.executor(Names.SEARCH).execute(() -> { + try { + barrier.await(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } catch (BrokenBarrierException ex) { + // + } } - } - }); - client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet(); + ); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 500)).execute().actionGet(); barrier.await(10, TimeUnit.SECONDS); - // This was here: Thread.sleep(200); - // Why? What was it for? - // Check that node info is correct NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().all().execute().actionGet(); for (int i = 0; i < 2; i++) { @@ -182,7 +167,7 @@ public class SimpleThreadPoolIT extends ESIntegTestCase { boolean found = false; for (ThreadPool.Info info : nodeInfo.getThreadPool()) { if (info.getName().equals(Names.SEARCH)) { - assertThat(info.getType(), equalTo("fixed")); + assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); found = true; break; } diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java index cb27fd71f9d..3d57c1d5206 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.threadpool; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -30,7 +31,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; +import java.io.IOException; import java.util.Map; import static org.elasticsearch.common.settings.Settings.settingsBuilder; @@ -44,9 +47,16 @@ import static org.hamcrest.Matchers.nullValue; */ public class ThreadPoolSerializationTests extends ESTestCase { BytesStreamOutput output = new BytesStreamOutput(); + private ThreadPool.ThreadPoolType threadPoolType; + + @Before + public void setUp() throws Exception { + super.setUp(); + threadPoolType = randomFrom(ThreadPool.ThreadPoolType.values()); + } public void testThatQueueSizeSerializationWorks() throws Exception { - ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k")); + ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k")); output.setVersion(Version.CURRENT); info.writeTo(output); @@ -58,7 +68,7 @@ public class ThreadPoolSerializationTests extends ESTestCase { } public void testThatNegativeQueueSizesCanBeSerialized() throws Exception { - ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null); + ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), null); output.setVersion(Version.CURRENT); info.writeTo(output); @@ -70,7 +80,7 @@ public class ThreadPoolSerializationTests extends ESTestCase { } public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception { - ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null); + ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), null); XContentBuilder builder = jsonBuilder(); builder.startObject(); info.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -95,7 +105,7 @@ public class ThreadPoolSerializationTests extends ESTestCase { } public void testThatToXContentWritesInteger() throws Exception { - ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("1k")); + ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("1k")); XContentBuilder builder = jsonBuilder(); builder.startObject(); info.toXContent(builder, ToXContent.EMPTY_PARAMS); @@ -111,4 +121,16 @@ public class ThreadPoolSerializationTests extends ESTestCase { assertThat(map, hasKey("queue_size")); assertThat(map.get("queue_size").toString(), is("1000")); } + + public void testThatThreadPoolTypeIsSerializedCorrectly() throws IOException { + ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType); + output.setVersion(Version.CURRENT); + info.writeTo(output); + + StreamInput input = StreamInput.wrap(output.bytes()); + ThreadPool.Info newInfo = new ThreadPool.Info(); + newInfo.readFrom(input); + + assertThat(newInfo.getThreadPoolType(), is(threadPoolType)); + } } diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTypeSettingsValidatorTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTypeSettingsValidatorTests.java new file mode 100644 index 00000000000..aa3b2a8dec2 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTypeSettingsValidatorTests.java @@ -0,0 +1,54 @@ +package org.elasticsearch.threadpool; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.settings.Validator; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.*; + +import static org.junit.Assert.*; + +public class ThreadPoolTypeSettingsValidatorTests extends ESTestCase { + private Validator validator; + + @Before + public void setUp() throws Exception { + super.setUp(); + validator = ThreadPool.THREAD_POOL_TYPE_SETTINGS_VALIDATOR; + } + + public void testValidThreadPoolTypeSettings() { + for (Map.Entry entry : ThreadPool.THREAD_POOL_TYPES.entrySet()) { + assertNull(validateSetting(validator, entry.getKey(), entry.getValue().getType())); + } + } + + public void testInvalidThreadPoolTypeSettings() { + for (Map.Entry entry : ThreadPool.THREAD_POOL_TYPES.entrySet()) { + Set set = new HashSet<>(); + set.addAll(Arrays.asList(ThreadPool.ThreadPoolType.values())); + set.remove(entry.getValue()); + ThreadPool.ThreadPoolType invalidThreadPoolType = randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()])); + String expectedMessage = String.format( + Locale.ROOT, + "thread pool type for [%s] can only be updated to [%s] but was [%s]", + entry.getKey(), + entry.getValue().getType(), + invalidThreadPoolType.getType()); + String message = validateSetting(validator, entry.getKey(), invalidThreadPoolType.getType()); + assertNotNull(message); + assertEquals(expectedMessage, message); + } + } + + public void testNonThreadPoolTypeSetting() { + String setting = ThreadPool.THREADPOOL_GROUP + randomAsciiOfLength(10) + "foo"; + String value = randomAsciiOfLength(10); + assertNull(validator.validate(setting, value, ClusterState.PROTO)); + } + + private String validateSetting(Validator validator, String threadPoolName, String value) { + return validator.validate(ThreadPool.THREADPOOL_GROUP + threadPoolName + ".type", value, ClusterState.PROTO); + } +} diff --git a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index cd252b60d71..faa08243af0 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -25,22 +25,330 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool.Names; import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.*; /** */ public class UpdateThreadPoolSettingsTests extends ESTestCase { + public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedException { + String threadPoolName = randomThreadPoolName(); + ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool(settingsBuilder() + .put("name", "testCorrectThreadPoolTypePermittedInSettings") + .put("threadpool." + threadPoolName + ".type", correctThreadPoolType.getType()) + .build()); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), correctThreadPoolType); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + public void testThreadPoolCanNotOverrideThreadPoolType() throws InterruptedException { + String threadPoolName = randomThreadPoolName(); + ThreadPool.ThreadPoolType incorrectThreadPoolType = randomIncorrectThreadPoolType(threadPoolName); + ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool( + settingsBuilder() + .put("name", "testThreadPoolCanNotOverrideThreadPoolType") + .put("threadpool." + threadPoolName + ".type", incorrectThreadPoolType.getType()) + .build()); + terminate(threadPool); + fail("expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertThat( + e.getMessage(), + is("setting threadpool." + threadPoolName + ".type to " + incorrectThreadPoolType.getType() + " is not permitted; must be " + correctThreadPoolType.getType())); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + public void testUpdateSettingsCanNotChangeThreadPoolType() throws InterruptedException { + String threadPoolName = randomThreadPoolName(); + ThreadPool.ThreadPoolType invalidThreadPoolType = randomIncorrectThreadPoolType(threadPoolName); + ThreadPool.ThreadPoolType validThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool(settingsBuilder().put("name", "testUpdateSettingsCanNotChangeThreadPoolType").build()); + + + threadPool.updateSettings( + settingsBuilder() + .put("threadpool." + threadPoolName + ".type", invalidThreadPoolType.getType()) + .build() + ); + fail("expected IllegalArgumentException"); + } catch (IllegalArgumentException e) { + assertThat( + e.getMessage(), + is("setting threadpool." + threadPoolName + ".type to " + invalidThreadPoolType.getType() + " is not permitted; must be " + validThreadPoolType.getType())); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + public void testCachedExecutorType() throws InterruptedException { + String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.CACHED); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool( + Settings.settingsBuilder() + .put("name", "testCachedExecutorType").build()); + + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + + threadPool.updateSettings(settingsBuilder() + .put("threadpool." + threadPoolName + ".keep_alive", "10m") + .build()); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(0)); + // Make sure keep alive value changed + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + + // Make sure keep alive value reused + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L)); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + + // Change keep alive + Executor oldExecutor = threadPool.executor(threadPoolName); + threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".keep_alive", "1m").build()); + // Make sure keep alive value changed + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); + // Make sure executor didn't change + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); + assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); + + // Set the same keep alive + threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".keep_alive", "1m").build()); + // Make sure keep alive value didn't change + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); + // Make sure executor didn't change + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); + assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + public void testFixedExecutorType() throws InterruptedException { + String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED); + ThreadPool threadPool = null; + + try { + threadPool = new ThreadPool(settingsBuilder() + .put("name", "testCachedExecutorType").build()); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + + threadPool.updateSettings(settingsBuilder() + .put("threadpool." + threadPoolName + ".size", "15") + .build()); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15)); + // keep alive does not apply to fixed thread pools + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L)); + + // Put old type back + threadPool.updateSettings(Settings.EMPTY); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); + // Make sure keep alive value is not used + assertThat(info(threadPool, threadPoolName).getKeepAlive(), nullValue()); + // Make sure keep pool size value were reused + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15)); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15)); + + // Change size + Executor oldExecutor = threadPool.executor(threadPoolName); + threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".size", "10").build()); + // Make sure size values changed + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(10)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(10)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(10)); + // Make sure executor didn't change + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); + assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); + + // Change queue capacity + threadPool.updateSettings(settingsBuilder() + .put("threadpool." + threadPoolName + ".queue", "500") + .build()); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + public void testScalingExecutorType() throws InterruptedException { + String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool(settingsBuilder() + .put("threadpool." + threadPoolName + ".size", 10) + .put("name", "testCachedExecutorType").build()); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(1)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10)); + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(5L)); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + + // Change settings that doesn't require pool replacement + Executor oldExecutor = threadPool.executor(threadPoolName); + threadPool.updateSettings(settingsBuilder() + .put("threadpool." + threadPoolName + ".keep_alive", "10m") + .put("threadpool." + threadPoolName + ".min", "2") + .put("threadpool." + threadPoolName + ".size", "15") + .build()); + assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); + assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(2)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(2)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15)); + // Make sure keep alive value changed + assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); + assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + public void testShutdownNowInterrupts() throws Exception { + String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED); + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool(Settings.settingsBuilder() + .put("threadpool." + threadPoolName + ".queue_size", 1000) + .put("name", "testCachedExecutorType").build()); + assertEquals(info(threadPool, threadPoolName).getQueueSize().getSingles(), 1000L); + + final CountDownLatch latch = new CountDownLatch(1); + ThreadPoolExecutor oldExecutor = (ThreadPoolExecutor) threadPool.executor(threadPoolName); + threadPool.executor(threadPoolName).execute(() -> { + try { + new CountDownLatch(1).await(); + } catch (InterruptedException ex) { + latch.countDown(); + Thread.currentThread().interrupt(); + } + } + ); + threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".queue_size", 2000).build()); + assertThat(threadPool.executor(threadPoolName), not(sameInstance(oldExecutor))); + assertThat(oldExecutor.isShutdown(), equalTo(true)); + assertThat(oldExecutor.isTerminating(), equalTo(true)); + assertThat(oldExecutor.isTerminated(), equalTo(false)); + threadPool.shutdownNow(); // should interrupt the thread + latch.await(3, TimeUnit.SECONDS); // If this throws then ThreadPool#shutdownNow didn't interrupt + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + public void testCustomThreadPool() throws Exception { + ThreadPool threadPool = null; + try { + threadPool = new ThreadPool(Settings.settingsBuilder() + .put("threadpool.my_pool1.type", "scaling") + .put("threadpool.my_pool2.type", "fixed") + .put("threadpool.my_pool2.size", "1") + .put("threadpool.my_pool2.queue_size", "1") + .put("name", "testCustomThreadPool").build()); + ThreadPoolInfo groups = threadPool.info(); + boolean foundPool1 = false; + boolean foundPool2 = false; + outer: + for (ThreadPool.Info info : groups) { + if ("my_pool1".equals(info.getName())) { + foundPool1 = true; + assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); + } else if ("my_pool2".equals(info.getName())) { + foundPool2 = true; + assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); + assertThat(info.getMin(), equalTo(1)); + assertThat(info.getMax(), equalTo(1)); + assertThat(info.getQueueSize().singles(), equalTo(1l)); + } else { + for (Field field : Names.class.getFields()) { + if (info.getName().equalsIgnoreCase(field.getName())) { + // This is ok it is a default thread pool + continue outer; + } + } + fail("Unexpected pool name: " + info.getName()); + } + } + assertThat(foundPool1, is(true)); + assertThat(foundPool2, is(true)); + + // Updating my_pool2 + Settings settings = Settings.builder() + .put("threadpool.my_pool2.size", "10") + .build(); + threadPool.updateSettings(settings); + + groups = threadPool.info(); + foundPool1 = false; + foundPool2 = false; + outer: + for (ThreadPool.Info info : groups) { + if ("my_pool1".equals(info.getName())) { + foundPool1 = true; + assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); + } else if ("my_pool2".equals(info.getName())) { + foundPool2 = true; + assertThat(info.getMax(), equalTo(10)); + assertThat(info.getMin(), equalTo(10)); + assertThat(info.getQueueSize().singles(), equalTo(1l)); + assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); + } else { + for (Field field : Names.class.getFields()) { + if (info.getName().equalsIgnoreCase(field.getName())) { + // This is ok it is a default thread pool + continue outer; + } + } + fail("Unexpected pool name: " + info.getName()); + } + } + assertThat(foundPool1, is(true)); + assertThat(foundPool2, is(true)); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + private void terminateThreadPoolIfNeeded(ThreadPool threadPool) throws InterruptedException { + if (threadPool != null) { + terminate(threadPool); + } + } + private ThreadPool.Info info(ThreadPool threadPool, String name) { for (ThreadPool.Info info : threadPool.info()) { if (info.getName().equals(name)) { @@ -50,247 +358,20 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase { return null; } - public void testCachedExecutorType() throws InterruptedException { - ThreadPool threadPool = new ThreadPool( - Settings.settingsBuilder() - .put("threadpool.search.type", "cached") - .put("name","testCachedExecutorType").build()); - - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L)); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - - // Replace with different type - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("same")); - assertThat(threadPool.executor(Names.SEARCH), is(ThreadPool.DIRECT_EXECUTOR)); - - // Replace with different type again - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(1)); - // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); - - // Put old type back - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - // Make sure keep alive value reused - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - - // Change keep alive - Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); - // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); - // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - - // Set the same keep alive - threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build()); - // Make sure keep alive value didn't change - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); - // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached")); - assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - terminate(threadPool); + private String randomThreadPoolName() { + Set threadPoolNames = ThreadPool.THREAD_POOL_TYPES.keySet(); + return randomFrom(threadPoolNames.toArray(new String[threadPoolNames.size()])); } - public void testFixedExecutorType() throws InterruptedException { - ThreadPool threadPool = new ThreadPool(settingsBuilder() - .put("threadpool.search.type", "fixed") - .put("name","testCachedExecutorType").build()); - - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - - // Replace with different type - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .put("threadpool.search.min", "2") - .put("threadpool.search.size", "15") - .build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); - assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2)); - assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); - // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); - - // Put old type back - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "fixed") - .build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed")); - // Make sure keep alive value is not used - assertThat(info(threadPool, Names.SEARCH).getKeepAlive(), nullValue()); - // Make sure keep pool size value were reused - assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(15)); - assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(15)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); - - // Change size - Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").build()); - // Make sure size values changed - assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10)); - assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(10)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(10)); - // Make sure executor didn't change - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed")); - assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - - // Change queue capacity - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.queue", "500") - .build()); - - terminate(threadPool); + private ThreadPool.ThreadPoolType randomIncorrectThreadPoolType(String threadPoolName) { + Set set = new HashSet<>(); + set.addAll(Arrays.asList(ThreadPool.ThreadPoolType.values())); + set.remove(ThreadPool.THREAD_POOL_TYPES.get(threadPoolName)); + ThreadPool.ThreadPoolType invalidThreadPoolType = randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()])); + return invalidThreadPoolType; } - public void testScalingExecutorType() throws InterruptedException { - ThreadPool threadPool = new ThreadPool(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.size", 10) - .put("name","testCachedExecutorType").build()); - - assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1)); - assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10)); - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L)); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - - // Change settings that doesn't require pool replacement - Executor oldExecutor = threadPool.executor(Names.SEARCH); - threadPool.updateSettings(settingsBuilder() - .put("threadpool.search.type", "scaling") - .put("threadpool.search.keep_alive", "10m") - .put("threadpool.search.min", "2") - .put("threadpool.search.size", "15") - .build()); - assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling")); - assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15)); - assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2)); - assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15)); - // Make sure keep alive value changed - assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); - assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor)); - - terminate(threadPool); + private String randomThreadPool(ThreadPool.ThreadPoolType type) { + return randomFrom(ThreadPool.THREAD_POOL_TYPES.entrySet().stream().filter(t -> t.getValue().equals(type)).map(t -> t.getKey()).collect(Collectors.toList())); } - - public void testShutdownNowInterrupts() throws Exception { - ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder() - .put("threadpool.search.type", "cached") - .put("name","testCachedExecutorType").build()); - - final CountDownLatch latch = new CountDownLatch(1); - ThreadPoolExecutor oldExecutor = (ThreadPoolExecutor) threadPool.executor(Names.SEARCH); - threadPool.executor(Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - new CountDownLatch(1).await(); - } catch (InterruptedException ex) { - latch.countDown(); - Thread.currentThread().interrupt(); - } - } - }); - threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()); - assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor))); - assertThat(oldExecutor.isShutdown(), equalTo(true)); - assertThat(oldExecutor.isTerminating(), equalTo(true)); - assertThat(oldExecutor.isTerminated(), equalTo(false)); - threadPool.shutdownNow(); // should interrupt the thread - latch.await(3, TimeUnit.SECONDS); // If this throws then shotdownNow didn't interrupt - terminate(threadPool); - } - - public void testCustomThreadPool() throws Exception { - ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder() - .put("threadpool.my_pool1.type", "cached") - .put("threadpool.my_pool2.type", "fixed") - .put("threadpool.my_pool2.size", "1") - .put("threadpool.my_pool2.queue_size", "1") - .put("name", "testCustomThreadPool").build()); - - ThreadPoolInfo groups = threadPool.info(); - boolean foundPool1 = false; - boolean foundPool2 = false; - outer: for (ThreadPool.Info info : groups) { - if ("my_pool1".equals(info.getName())) { - foundPool1 = true; - assertThat(info.getType(), equalTo("cached")); - } else if ("my_pool2".equals(info.getName())) { - foundPool2 = true; - assertThat(info.getType(), equalTo("fixed")); - assertThat(info.getMin(), equalTo(1)); - assertThat(info.getMax(), equalTo(1)); - assertThat(info.getQueueSize().singles(), equalTo(1l)); - } else { - for (Field field : Names.class.getFields()) { - if (info.getName().equalsIgnoreCase(field.getName())) { - // This is ok it is a default thread pool - continue outer; - } - } - fail("Unexpected pool name: " + info.getName()); - } - } - assertThat(foundPool1, is(true)); - assertThat(foundPool2, is(true)); - - // Updating my_pool2 - Settings settings = Settings.builder() - .put("threadpool.my_pool2.size", "10") - .build(); - threadPool.updateSettings(settings); - - groups = threadPool.info(); - foundPool1 = false; - foundPool2 = false; - outer: for (ThreadPool.Info info : groups) { - if ("my_pool1".equals(info.getName())) { - foundPool1 = true; - assertThat(info.getType(), equalTo("cached")); - } else if ("my_pool2".equals(info.getName())) { - foundPool2 = true; - assertThat(info.getMax(), equalTo(10)); - assertThat(info.getMin(), equalTo(10)); - assertThat(info.getQueueSize().singles(), equalTo(1l)); - assertThat(info.getType(), equalTo("fixed")); - } else { - for (Field field : Names.class.getFields()) { - if (info.getName().equalsIgnoreCase(field.getName())) { - // This is ok it is a default thread pool - continue outer; - } - } - fail("Unexpected pool name: " + info.getName()); - } - } - assertThat(foundPool1, is(true)); - assertThat(foundPool2, is(true)); - terminate(threadPool); - } - } diff --git a/docs/reference/migration/migrate_3_0.asciidoc b/docs/reference/migration/migrate_3_0.asciidoc index e0bdff8ddee..aa602f9d64d 100644 --- a/docs/reference/migration/migrate_3_0.asciidoc +++ b/docs/reference/migration/migrate_3_0.asciidoc @@ -389,4 +389,12 @@ request cache and the field data cache. This setting would arbitrarily pick the first interface not marked as loopback. Instead, specify by address scope (e.g. `_local_,_site_` for all loopback and private network addresses) or by explicit interface names, -hostnames, or addresses. +hostnames, or addresses. + +=== Forbid changing of thread pool types + +Previously, <> could be dynamically adjusted. The thread pool type effectively +controls the backing queue for the thread pool and modifying this is an expert setting with minimal practical benefits +and high risk of being misused. The ability to change the thread pool type for any thread pool has been removed; do note +that it is still possible to adjust relevant thread pool parameters for each of the thread pools (e.g., depending on +the thread pool type, `keep_alive`, `queue_size`, etc.). diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 591277889bc..bfd5474183c 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -9,87 +9,92 @@ of discarded. There are several thread pools, but the important ones include: +`generic`:: + For generic operations (e.g., background node discovery). + Thread pool type is `cached`. + `index`:: - For index/delete operations. Defaults to `fixed` + For index/delete operations. Thread pool type is `fixed` with a size of `# of available processors`, queue_size of `200`. `search`:: - For count/search operations. Defaults to `fixed` + For count/search operations. Thread pool type is `fixed` with a size of `int((# of available_processors * 3) / 2) + 1`, queue_size of `1000`. `suggest`:: - For suggest operations. Defaults to `fixed` + For suggest operations. Thread pool type is `fixed` with a size of `# of available processors`, queue_size of `1000`. `get`:: - For get operations. Defaults to `fixed` + For get operations. Thread pool type is `fixed` with a size of `# of available processors`, queue_size of `1000`. `bulk`:: - For bulk operations. Defaults to `fixed` + For bulk operations. Thread pool type is `fixed` with a size of `# of available processors`, queue_size of `50`. `percolate`:: - For percolate operations. Defaults to `fixed` + For percolate operations. Thread pool type is `fixed` with a size of `# of available processors`, queue_size of `1000`. `snapshot`:: - For snapshot/restore operations. Defaults to `scaling` with a - keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`, max at 5. + For snapshot/restore operations. Thread pool type is `scaling` with a + keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`. `warmer`:: - For segment warm-up operations. Defaults to `scaling` with a - keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`, max at 5. + For segment warm-up operations. Thread pool type is `scaling` with a + keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`. `refresh`:: - For refresh operations. Defaults to `scaling` with a - keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`, max at 10. + For refresh operations. Thread pool type is `scaling` with a + keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`. `listener`:: Mainly for java client executing of action when listener threaded is set to true. - Default size of `(# of available processors)/2`, max at 10. + Thread pool type is `scaling` with a default size of `min(10, (# of available processors)/2)`. -Changing a specific thread pool can be done by setting its type and -specific type parameters, for example, changing the `index` thread pool -to have more threads: +Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `index` +thread pool to have more threads: [source,js] -------------------------------------------------- threadpool: index: - type: fixed size: 30 -------------------------------------------------- -NOTE: you can update threadpool settings live using - <>. - +NOTE: you can update thread pool settings dynamically using <>. [float] [[types]] === Thread pool types -The following are the types of thread pools that can be used and their -respective parameters: +The following are the types of thread pools and their respective parameters: [float] -==== `cache` +==== `cached` -The `cache` thread pool is an unbounded thread pool that will spawn a -thread if there are pending requests. Here is an example of how to set -it: +The `cached` thread pool is an unbounded thread pool that will spawn a +thread if there are pending requests. This thread pool is used to +prevent requests submitted to this pool from blocking or being +rejected. Unused threads in this thread pool will be terminated after +a keep alive expires (defaults to five minutes). The `cached` thread +pool is reserved for the <> thread pool. + +The `keep_alive` parameter determines how long a thread should be kept +around in the thread pool without doing any work. [source,js] -------------------------------------------------- threadpool: - index: - type: cached + generic: + keep_alive: 2m -------------------------------------------------- [float] @@ -111,7 +116,6 @@ full, it will abort the request. -------------------------------------------------- threadpool: index: - type: fixed size: 30 queue_size: 1000 -------------------------------------------------- @@ -130,7 +134,6 @@ around in the thread pool without it doing any work. -------------------------------------------------- threadpool: warmer: - type: scaling size: 8 keep_alive: 2m -------------------------------------------------- diff --git a/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 6cafe4c3b79..4f8e64cc1a0 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -64,10 +64,10 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.indices.IndicesService; @@ -88,7 +88,6 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.AssertingLocalTransport; import org.elasticsearch.test.transport.MockTransportService; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty.NettyTransport; @@ -98,20 +97,11 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Random; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.*; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -119,15 +109,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static junit.framework.Assert.fail; -import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY; -import static org.apache.lucene.util.LuceneTestCase.rarely; -import static org.apache.lucene.util.LuceneTestCase.usually; +import static org.apache.lucene.util.LuceneTestCase.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.test.ESTestCase.assertBusy; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.*; import static org.junit.Assert.assertThat; /** @@ -404,18 +390,6 @@ public final class InternalTestCluster extends TestCluster { if (random.nextBoolean()) { // sometimes set a builder.put(SearchService.DEFAULT_KEEPALIVE_KEY, TimeValue.timeValueSeconds(100 + random.nextInt(5 * 60))); } - if (random.nextBoolean()) { - // change threadpool types to make sure we don't have components that rely on the type of thread pools - for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET, - ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.FORCE_MERGE, - ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT, - ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) { - if (random.nextBoolean()) { - final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling")); - builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type); - } - } - } if (random.nextInt(10) == 0) { // node gets an extra cpu this time