Merge pull request #14367 from jasontedor/forbid-changing-thread-pool-types

Forbid changing thread pool types
This commit is contained in:
Jason Tedor 2015-11-02 21:17:24 -05:00
commit 9cc2eb086e
11 changed files with 687 additions and 430 deletions

View File

@ -177,7 +177,7 @@ public class ClusterModule extends AbstractModule {
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT, Validator.TIME_NON_NEGATIVE);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_SIZE_PER_SEC, Validator.BYTES_SIZE);
registerClusterDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*", Validator.EMPTY);
registerClusterDynamicSetting(ThreadPool.THREADPOOL_GROUP + "*", ThreadPool.THREAD_POOL_TYPE_SETTINGS_VALIDATOR);
registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, Validator.INTEGER);
registerClusterDynamicSetting(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES, Validator.INTEGER);
registerClusterDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK, Validator.EMPTY);

View File

@ -288,7 +288,7 @@ public class RestThreadPoolAction extends AbstractCatAction {
}
}
table.addCell(poolInfo == null ? null : poolInfo.getType());
table.addCell(poolInfo == null ? null : poolInfo.getThreadPoolType());
table.addCell(poolStats == null ? null : poolStats.getActive());
table.addCell(poolStats == null ? null : poolStats.getThreads());
table.addCell(poolStats == null ? null : poolStats.getQueue());

View File

@ -20,6 +20,8 @@
package org.elasticsearch.threadpool;
import org.apache.lucene.util.Counter;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.stream.StreamInput;
@ -39,22 +41,11 @@ import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.node.settings.NodeSettingsService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -86,6 +77,101 @@ public class ThreadPool extends AbstractComponent {
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
}
public enum ThreadPoolType {
CACHED("cached"),
DIRECT("direct"),
FIXED("fixed"),
SCALING("scaling");
private final String type;
public String getType() {
return type;
}
ThreadPoolType(String type) {
this.type = type;
}
private final static Map<String, ThreadPoolType> TYPE_MAP;
static {
Map<String, ThreadPoolType> typeMap = new HashMap<>();
for (ThreadPoolType threadPoolType : ThreadPoolType.values()) {
typeMap.put(threadPoolType.getType(), threadPoolType);
}
TYPE_MAP = Collections.unmodifiableMap(typeMap);
}
public static ThreadPoolType fromType(String type) {
ThreadPoolType threadPoolType = TYPE_MAP.get(type);
if (threadPoolType == null) {
throw new IllegalArgumentException("no ThreadPoolType for " + type);
}
return threadPoolType;
}
}
public static Map<String, ThreadPoolType> THREAD_POOL_TYPES;
static {
HashMap<String, ThreadPoolType> map = new HashMap<>();
map.put(Names.SAME, ThreadPoolType.DIRECT);
map.put(Names.GENERIC, ThreadPoolType.CACHED);
map.put(Names.LISTENER, ThreadPoolType.FIXED);
map.put(Names.GET, ThreadPoolType.FIXED);
map.put(Names.INDEX, ThreadPoolType.FIXED);
map.put(Names.BULK, ThreadPoolType.FIXED);
map.put(Names.SEARCH, ThreadPoolType.FIXED);
map.put(Names.SUGGEST, ThreadPoolType.FIXED);
map.put(Names.PERCOLATE, ThreadPoolType.FIXED);
map.put(Names.MANAGEMENT, ThreadPoolType.SCALING);
map.put(Names.FLUSH, ThreadPoolType.SCALING);
map.put(Names.REFRESH, ThreadPoolType.SCALING);
map.put(Names.WARMER, ThreadPoolType.SCALING);
map.put(Names.SNAPSHOT, ThreadPoolType.SCALING);
map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED);
map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING);
map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING);
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
}
private static void add(Map<String, Settings> executorSettings, ExecutorSettingsBuilder builder) {
Settings settings = builder.build();
String name = settings.get("name");
executorSettings.put(name, settings);
}
private static class ExecutorSettingsBuilder {
Map<String, String> settings = new HashMap<>();
public ExecutorSettingsBuilder(String name) {
settings.put("name", name);
settings.put("type", THREAD_POOL_TYPES.get(name).getType());
}
public ExecutorSettingsBuilder size(int availableProcessors) {
return add("size", Integer.toString(availableProcessors));
}
public ExecutorSettingsBuilder queueSize(int queueSize) {
return add("queue_size", Integer.toString(queueSize));
}
public ExecutorSettingsBuilder keepAlive(String keepAlive) {
return add("keep_alive", keepAlive);
}
private ExecutorSettingsBuilder add(String key, String value) {
settings.put(key, value);
return this;
}
public Settings build() {
return settingsBuilder().put(settings).build();
}
}
public static final String THREADPOOL_GROUP = "threadpool.";
private volatile Map<String, ExecutorHolder> executors;
@ -102,7 +188,6 @@ public class ThreadPool extends AbstractComponent {
static final Executor DIRECT_EXECUTOR = command -> command.run();
public ThreadPool(String name) {
this(Settings.builder().put("name", name).build());
}
@ -112,42 +197,31 @@ public class ThreadPool extends AbstractComponent {
assert settings.get("name") != null : "ThreadPool's settings should contain a name";
Map<String, Settings> groupSettings = settings.getGroups(THREADPOOL_GROUP);
Map<String, Settings> groupSettings = getThreadPoolSettingsGroup(settings);
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5);
int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);
Map<String, Settings> defaultExecutorTypeSettings = new HashMap<>();
defaultExecutorTypeSettings.put(Names.GENERIC, settingsBuilder().put("type", "cached").put("keep_alive", "30s").build());
defaultExecutorTypeSettings.put(Names.INDEX,
settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 200).build());
defaultExecutorTypeSettings.put(Names.BULK,
settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 50).build());
defaultExecutorTypeSettings.put(Names.GET,
settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build());
defaultExecutorTypeSettings.put(Names.SEARCH,
settingsBuilder().put("type", "fixed").put("size", ((availableProcessors * 3) / 2) + 1).put("queue_size", 1000).build());
defaultExecutorTypeSettings.put(Names.SUGGEST,
settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build());
defaultExecutorTypeSettings.put(Names.PERCOLATE,
settingsBuilder().put("type", "fixed").put("size", availableProcessors).put("queue_size", 1000).build());
defaultExecutorTypeSettings .put(Names.MANAGEMENT, settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", 5).build());
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GENERIC).keepAlive("30s"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SUGGEST).size(availableProcessors).queueSize(1000));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.PERCOLATE).size(availableProcessors).queueSize(1000));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.MANAGEMENT).size(5).keepAlive("5m"));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
defaultExecutorTypeSettings.put(Names.LISTENER, settingsBuilder().put("type", "fixed").put("size", halfProcMaxAt10).build());
defaultExecutorTypeSettings.put(Names.FLUSH,
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build());
defaultExecutorTypeSettings.put(Names.REFRESH,
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt10).build());
defaultExecutorTypeSettings.put(Names.WARMER,
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build());
defaultExecutorTypeSettings.put(Names.SNAPSHOT,
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", halfProcMaxAt5).build());
defaultExecutorTypeSettings.put(Names.FORCE_MERGE, settingsBuilder().put("type", "fixed").put("size", 1).build());
defaultExecutorTypeSettings.put(Names.FETCH_SHARD_STARTED,
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build());
defaultExecutorTypeSettings.put(Names.FETCH_SHARD_STORE,
settingsBuilder().put("type", "scaling").put("keep_alive", "5m").put("size", availableProcessors * 2).build());
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FLUSH).size(halfProcMaxAt5).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.REFRESH).size(halfProcMaxAt10).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.WARMER).size(halfProcMaxAt5).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SNAPSHOT).size(halfProcMaxAt5).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m"));
this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings);
Map<String, ExecutorHolder> executors = new HashMap<>();
@ -163,8 +237,8 @@ public class ThreadPool extends AbstractComponent {
executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), Settings.EMPTY));
}
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, "same")));
if (!executors.get(Names.GENERIC).info.getType().equals("cached")) {
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
if (!executors.get(Names.GENERIC).info.getThreadPoolType().equals(ThreadPoolType.CACHED)) {
throw new IllegalArgumentException("generic thread pool must be of type cached");
}
this.executors = unmodifiableMap(executors);
@ -178,6 +252,12 @@ public class ThreadPool extends AbstractComponent {
this.estimatedTimeThread.start();
}
private Map<String, Settings> getThreadPoolSettingsGroup(Settings settings) {
Map<String, Settings> groupSettings = settings.getGroups(THREADPOOL_GROUP);
validate(groupSettings);
return groupSettings;
}
public void setNodeSettingsService(NodeSettingsService nodeSettingsService) {
if(settingsListenerIsSet) {
throw new IllegalStateException("the node settings listener was set more then once");
@ -326,24 +406,28 @@ public class ThreadPool extends AbstractComponent {
settings = Settings.Builder.EMPTY_SETTINGS;
}
Info previousInfo = previousExecutorHolder != null ? previousExecutorHolder.info : null;
String type = settings.get("type", previousInfo != null ? previousInfo.getType() : defaultSettings.get("type"));
String type = settings.get("type", previousInfo != null ? previousInfo.getThreadPoolType().getType() : defaultSettings.get("type"));
ThreadPoolType threadPoolType = ThreadPoolType.fromType(type);
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name);
if ("same".equals(type)) {
if (ThreadPoolType.DIRECT == threadPoolType) {
if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}]", name, type);
} else {
logger.debug("creating thread_pool [{}], type [{}]", name, type);
}
return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, type));
} else if ("cached".equals(type)) {
return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, threadPoolType));
} else if (ThreadPoolType.CACHED == threadPoolType) {
if (!Names.GENERIC.equals(name)) {
throw new IllegalArgumentException("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]");
}
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
if (previousExecutorHolder != null) {
if ("cached".equals(previousInfo.getType())) {
if (ThreadPoolType.CACHED == previousInfo.getThreadPoolType()) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, -1, -1, updatedKeepAlive, null));
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, -1, -1, updatedKeepAlive, null));
}
return previousExecutorHolder;
}
@ -358,13 +442,13 @@ public class ThreadPool extends AbstractComponent {
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
}
Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) {
return new ExecutorHolder(executor, new Info(name, threadPoolType, -1, -1, keepAlive, null));
} else if (ThreadPoolType.FIXED == threadPoolType) {
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null));
if (previousExecutorHolder != null) {
if ("fixed".equals(previousInfo.getType())) {
if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) {
SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize())));
if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) {
int updatedSize = settings.getAsInt("size", previousInfo.getMax());
@ -378,7 +462,7 @@ public class ThreadPool extends AbstractComponent {
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setCorePoolSize(updatedSize);
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
}
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedSize, updatedSize, null, updatedQueueSize));
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, updatedSize, updatedSize, null, updatedQueueSize));
}
return previousExecutorHolder;
}
@ -393,13 +477,13 @@ public class ThreadPool extends AbstractComponent {
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize));
} else if ("scaling".equals(type)) {
return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize));
} else if (ThreadPoolType.SCALING == threadPoolType) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1);
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
if (previousExecutorHolder != null) {
if ("scaling".equals(previousInfo.getType())) {
if (ThreadPoolType.SCALING == previousInfo.getThreadPoolType()) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
int updatedMin = settings.getAsInt("min", previousInfo.getMin());
int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax()));
@ -414,7 +498,7 @@ public class ThreadPool extends AbstractComponent {
if (previousInfo.getMax() != updatedSize) {
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
}
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, type, updatedMin, updatedSize, updatedKeepAlive, null));
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, updatedMin, updatedSize, updatedKeepAlive, null));
}
return previousExecutorHolder;
}
@ -437,13 +521,13 @@ public class ThreadPool extends AbstractComponent {
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
}
Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
return new ExecutorHolder(executor, new Info(name, type, min, size, keepAlive, null));
return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null));
}
throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]");
}
public void updateSettings(Settings settings) {
Map<String, Settings> groupSettings = settings.getGroups("threadpool");
Map<String, Settings> groupSettings = getThreadPoolSettingsGroup(settings);
if (groupSettings.isEmpty()) {
return;
}
@ -490,6 +574,20 @@ public class ThreadPool extends AbstractComponent {
}
}
private void validate(Map<String, Settings> groupSettings) {
for (String key : groupSettings.keySet()) {
if (!THREAD_POOL_TYPES.containsKey(key)) {
continue;
}
String type = groupSettings.get(key).get("type");
ThreadPoolType correctThreadPoolType = THREAD_POOL_TYPES.get(key);
// TODO: the type equality check can be removed after #3760/#6732 are addressed
if (type != null && !correctThreadPoolType.getType().equals(type)) {
throw new IllegalArgumentException("setting " + THREADPOOL_GROUP + key + ".type to " + type + " is not permitted; must be " + correctThreadPoolType.getType());
}
}
}
/**
* A thread pool size can also be unbounded and is represented by -1, which is not supported by SizeValue (which only supports positive numbers)
*/
@ -643,7 +741,7 @@ public class ThreadPool extends AbstractComponent {
public static class Info implements Streamable, ToXContent {
private String name;
private String type;
private ThreadPoolType type;
private int min;
private int max;
private TimeValue keepAlive;
@ -653,15 +751,15 @@ public class ThreadPool extends AbstractComponent {
}
public Info(String name, String type) {
public Info(String name, ThreadPoolType type) {
this(name, type, -1);
}
public Info(String name, String type, int size) {
public Info(String name, ThreadPoolType type, int size) {
this(name, type, size, size, null, null);
}
public Info(String name, String type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) {
public Info(String name, ThreadPoolType type, int min, int max, @Nullable TimeValue keepAlive, @Nullable SizeValue queueSize) {
this.name = name;
this.type = type;
this.min = min;
@ -674,7 +772,7 @@ public class ThreadPool extends AbstractComponent {
return this.name;
}
public String getType() {
public ThreadPoolType getThreadPoolType() {
return this.type;
}
@ -699,7 +797,7 @@ public class ThreadPool extends AbstractComponent {
@Override
public void readFrom(StreamInput in) throws IOException {
name = in.readString();
type = in.readString();
type = ThreadPoolType.fromType(in.readString());
min = in.readInt();
max = in.readInt();
if (in.readBoolean()) {
@ -716,7 +814,7 @@ public class ThreadPool extends AbstractComponent {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(type);
out.writeString(type.getType());
out.writeInt(min);
out.writeInt(max);
if (keepAlive == null) {
@ -739,7 +837,7 @@ public class ThreadPool extends AbstractComponent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(name, XContentBuilder.FieldCaseConversion.NONE);
builder.field(Fields.TYPE, type);
builder.field(Fields.TYPE, type.getType());
if (min != -1) {
builder.field(Fields.MIN, min);
}
@ -814,4 +912,37 @@ public class ThreadPool extends AbstractComponent {
return false;
}
public static ThreadPoolTypeSettingsValidator THREAD_POOL_TYPE_SETTINGS_VALIDATOR = new ThreadPoolTypeSettingsValidator();
private static class ThreadPoolTypeSettingsValidator implements Validator {
@Override
public String validate(String setting, String value, ClusterState clusterState) {
// TODO: the type equality validation can be removed after #3760/#6732 are addressed
Matcher matcher = Pattern.compile("threadpool\\.(.*)\\.type").matcher(setting);
if (!matcher.matches()) {
return null;
} else {
String threadPool = matcher.group(1);
ThreadPool.ThreadPoolType defaultThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPool);
ThreadPool.ThreadPoolType threadPoolType;
try {
threadPoolType = ThreadPool.ThreadPoolType.fromType(value);
} catch (IllegalArgumentException e) {
return e.getMessage();
}
if (defaultThreadPoolType.equals(threadPoolType)) {
return null;
} else {
return String.format(
Locale.ROOT,
"thread pool type for [%s] can only be updated to [%s] but was [%s]",
threadPool,
defaultThreadPoolType.getType(),
threadPoolType.getType()
);
}
}
}
}
}

View File

@ -37,7 +37,6 @@ public class SearchWithRejectionsIT extends ESIntegTestCase {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder().put(super.nodeSettings(nodeOrdinal))
.put("threadpool.search.type", "fixed")
.put("threadpool.search.size", 1)
.put("threadpool.search.queue_size", 1)
.build();

View File

@ -46,20 +46,13 @@ import java.lang.management.ThreadMXBean;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.regex.Pattern;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.*;
/**
*/
@ -67,7 +60,7 @@ import static org.hamcrest.Matchers.sameInstance;
public class SimpleThreadPoolIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put("threadpool.search.type", "cached").build();
return Settings.settingsBuilder().build();
}
public void testThreadNames() throws Exception {
@ -130,16 +123,14 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
internalCluster().startNodesAsync(2).get();
ThreadPool threadPool = internalCluster().getDataNodeInstance(ThreadPool.class);
// Check that settings are changed
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(5L));
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.keep_alive", "10m").build()).execute().actionGet();
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(1000));
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 2000).build()).execute().actionGet();
assertThat(((ThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getQueue().remainingCapacity(), equalTo(2000));
// Make sure that threads continue executing when executor is replaced
final CyclicBarrier barrier = new CyclicBarrier(2);
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.executor(Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
threadPool.executor(Names.SEARCH).execute(() -> {
try {
barrier.await();
} catch (InterruptedException ex) {
@ -147,9 +138,8 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
} catch (BrokenBarrierException ex) {
//
}
}
});
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 1000).build()).execute().actionGet();
assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor)));
assertThat(((ThreadPoolExecutor) oldExecutor).isShutdown(), equalTo(true));
assertThat(((ThreadPoolExecutor) oldExecutor).isTerminating(), equalTo(true));
@ -157,9 +147,7 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
barrier.await(10, TimeUnit.SECONDS);
// Make sure that new thread executor is functional
threadPool.executor(Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
threadPool.executor(Names.SEARCH).execute(() -> {
try {
barrier.await();
} catch (InterruptedException ex) {
@ -168,13 +156,10 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
//
}
}
});
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.type", "fixed").build()).execute().actionGet();
);
client().admin().cluster().prepareUpdateSettings().setTransientSettings(settingsBuilder().put("threadpool.search.queue_size", 500)).execute().actionGet();
barrier.await(10, TimeUnit.SECONDS);
// This was here: Thread.sleep(200);
// Why? What was it for?
// Check that node info is correct
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().all().execute().actionGet();
for (int i = 0; i < 2; i++) {
@ -182,7 +167,7 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
boolean found = false;
for (ThreadPool.Info info : nodeInfo.getThreadPool()) {
if (info.getName().equals(Names.SEARCH)) {
assertThat(info.getType(), equalTo("fixed"));
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
found = true;
break;
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.threadpool;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -30,7 +31,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.Map;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -44,9 +47,16 @@ import static org.hamcrest.Matchers.nullValue;
*/
public class ThreadPoolSerializationTests extends ESTestCase {
BytesStreamOutput output = new BytesStreamOutput();
private ThreadPool.ThreadPoolType threadPoolType;
@Before
public void setUp() throws Exception {
super.setUp();
threadPoolType = randomFrom(ThreadPool.ThreadPoolType.values());
}
public void testThatQueueSizeSerializationWorks() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k"));
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k"));
output.setVersion(Version.CURRENT);
info.writeTo(output);
@ -58,7 +68,7 @@ public class ThreadPoolSerializationTests extends ESTestCase {
}
public void testThatNegativeQueueSizesCanBeSerialized() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null);
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), null);
output.setVersion(Version.CURRENT);
info.writeTo(output);
@ -70,7 +80,7 @@ public class ThreadPoolSerializationTests extends ESTestCase {
}
public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null);
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), null);
XContentBuilder builder = jsonBuilder();
builder.startObject();
info.toXContent(builder, ToXContent.EMPTY_PARAMS);
@ -95,7 +105,7 @@ public class ThreadPoolSerializationTests extends ESTestCase {
}
public void testThatToXContentWritesInteger() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("1k"));
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType, 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("1k"));
XContentBuilder builder = jsonBuilder();
builder.startObject();
info.toXContent(builder, ToXContent.EMPTY_PARAMS);
@ -111,4 +121,16 @@ public class ThreadPoolSerializationTests extends ESTestCase {
assertThat(map, hasKey("queue_size"));
assertThat(map.get("queue_size").toString(), is("1000"));
}
public void testThatThreadPoolTypeIsSerializedCorrectly() throws IOException {
ThreadPool.Info info = new ThreadPool.Info("foo", threadPoolType);
output.setVersion(Version.CURRENT);
info.writeTo(output);
StreamInput input = StreamInput.wrap(output.bytes());
ThreadPool.Info newInfo = new ThreadPool.Info();
newInfo.readFrom(input);
assertThat(newInfo.getThreadPoolType(), is(threadPoolType));
}
}

View File

@ -0,0 +1,54 @@
package org.elasticsearch.threadpool;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.settings.Validator;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.*;
import static org.junit.Assert.*;
public class ThreadPoolTypeSettingsValidatorTests extends ESTestCase {
private Validator validator;
@Before
public void setUp() throws Exception {
super.setUp();
validator = ThreadPool.THREAD_POOL_TYPE_SETTINGS_VALIDATOR;
}
public void testValidThreadPoolTypeSettings() {
for (Map.Entry<String, ThreadPool.ThreadPoolType> entry : ThreadPool.THREAD_POOL_TYPES.entrySet()) {
assertNull(validateSetting(validator, entry.getKey(), entry.getValue().getType()));
}
}
public void testInvalidThreadPoolTypeSettings() {
for (Map.Entry<String, ThreadPool.ThreadPoolType> entry : ThreadPool.THREAD_POOL_TYPES.entrySet()) {
Set<ThreadPool.ThreadPoolType> set = new HashSet<>();
set.addAll(Arrays.asList(ThreadPool.ThreadPoolType.values()));
set.remove(entry.getValue());
ThreadPool.ThreadPoolType invalidThreadPoolType = randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()]));
String expectedMessage = String.format(
Locale.ROOT,
"thread pool type for [%s] can only be updated to [%s] but was [%s]",
entry.getKey(),
entry.getValue().getType(),
invalidThreadPoolType.getType());
String message = validateSetting(validator, entry.getKey(), invalidThreadPoolType.getType());
assertNotNull(message);
assertEquals(expectedMessage, message);
}
}
public void testNonThreadPoolTypeSetting() {
String setting = ThreadPool.THREADPOOL_GROUP + randomAsciiOfLength(10) + "foo";
String value = randomAsciiOfLength(10);
assertNull(validator.validate(setting, value, ClusterState.PROTO));
}
private String validateSetting(Validator validator, String threadPoolName, String value) {
return validator.validate(ThreadPool.THREADPOOL_GROUP + threadPoolName + ".type", value, ClusterState.PROTO);
}
}

View File

@ -25,188 +25,231 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.*;
/**
*/
public class UpdateThreadPoolSettingsTests extends ESTestCase {
private ThreadPool.Info info(ThreadPool threadPool, String name) {
for (ThreadPool.Info info : threadPool.info()) {
if (info.getName().equals(name)) {
return info;
public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedException {
String threadPoolName = randomThreadPoolName();
ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settingsBuilder()
.put("name", "testCorrectThreadPoolTypePermittedInSettings")
.put("threadpool." + threadPoolName + ".type", correctThreadPoolType.getType())
.build());
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), correctThreadPoolType);
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
return null;
public void testThreadPoolCanNotOverrideThreadPoolType() throws InterruptedException {
String threadPoolName = randomThreadPoolName();
ThreadPool.ThreadPoolType incorrectThreadPoolType = randomIncorrectThreadPoolType(threadPoolName);
ThreadPool.ThreadPoolType correctThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(
settingsBuilder()
.put("name", "testThreadPoolCanNotOverrideThreadPoolType")
.put("threadpool." + threadPoolName + ".type", incorrectThreadPoolType.getType())
.build());
terminate(threadPool);
fail("expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(
e.getMessage(),
is("setting threadpool." + threadPoolName + ".type to " + incorrectThreadPoolType.getType() + " is not permitted; must be " + correctThreadPoolType.getType()));
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testUpdateSettingsCanNotChangeThreadPoolType() throws InterruptedException {
String threadPoolName = randomThreadPoolName();
ThreadPool.ThreadPoolType invalidThreadPoolType = randomIncorrectThreadPoolType(threadPoolName);
ThreadPool.ThreadPoolType validThreadPoolType = ThreadPool.THREAD_POOL_TYPES.get(threadPoolName);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settingsBuilder().put("name", "testUpdateSettingsCanNotChangeThreadPoolType").build());
threadPool.updateSettings(
settingsBuilder()
.put("threadpool." + threadPoolName + ".type", invalidThreadPoolType.getType())
.build()
);
fail("expected IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertThat(
e.getMessage(),
is("setting threadpool." + threadPoolName + ".type to " + invalidThreadPoolType.getType() + " is not permitted; must be " + validThreadPoolType.getType()));
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testCachedExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.CACHED);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(
Settings.settingsBuilder()
.put("threadpool.search.type", "cached")
.put("name", "testCachedExecutorType").build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "same").build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("same"));
assertThat(threadPool.executor(Names.SEARCH), is(ThreadPool.DIRECT_EXECUTOR));
// Replace with different type again
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.put("threadpool." + threadPoolName + ".keep_alive", "10m")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(1));
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(0));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Put old type back
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "cached").build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
// Make sure keep alive value reused
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L));
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
// Change keep alive
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build());
Executor oldExecutor = threadPool.executor(threadPoolName);
threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".keep_alive", "1m").build());
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
// Set the same keep alive
threadPool.updateSettings(settingsBuilder().put("threadpool.search.keep_alive", "1m").build());
threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".keep_alive", "1m").build());
// Make sure keep alive value didn't change
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("cached"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
terminate(threadPool);
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testFixedExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(settingsBuilder()
.put("threadpool.search.type", "fixed")
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settingsBuilder()
.put("name", "testCachedExecutorType").build());
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
// Replace with different type
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.put("threadpool." + threadPoolName + ".size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
// keep alive does not apply to fixed thread pools
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L));
// Put old type back
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "fixed")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed"));
threadPool.updateSettings(Settings.EMPTY);
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
// Make sure keep alive value is not used
assertThat(info(threadPool, Names.SEARCH).getKeepAlive(), nullValue());
assertThat(info(threadPool, threadPoolName).getKeepAlive(), nullValue());
// Make sure keep pool size value were reused
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
// Change size
Executor oldExecutor = threadPool.executor(Names.SEARCH);
threadPool.updateSettings(settingsBuilder().put("threadpool.search.size", "10").build());
Executor oldExecutor = threadPool.executor(threadPoolName);
threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".size", "10").build());
// Make sure size values changed
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(10));
// Make sure executor didn't change
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("fixed"));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
// Change queue capacity
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.queue", "500")
.put("threadpool." + threadPoolName + ".queue", "500")
.build());
terminate(threadPool);
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testScalingExecutorType() throws InterruptedException {
ThreadPool threadPool = new ThreadPool(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.size", 10)
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(settingsBuilder()
.put("threadpool." + threadPoolName + ".size", 10)
.put("name", "testCachedExecutorType").build());
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(1));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(10));
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(5L));
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(1));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(5L));
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
// Change settings that doesn't require pool replacement
Executor oldExecutor = threadPool.executor(Names.SEARCH);
Executor oldExecutor = threadPool.executor(threadPoolName);
threadPool.updateSettings(settingsBuilder()
.put("threadpool.search.type", "scaling")
.put("threadpool.search.keep_alive", "10m")
.put("threadpool.search.min", "2")
.put("threadpool.search.size", "15")
.put("threadpool." + threadPoolName + ".keep_alive", "10m")
.put("threadpool." + threadPoolName + ".min", "2")
.put("threadpool." + threadPoolName + ".size", "15")
.build());
assertThat(info(threadPool, Names.SEARCH).getType(), equalTo("scaling"));
assertThat(threadPool.executor(Names.SEARCH), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, Names.SEARCH).getMin(), equalTo(2));
assertThat(info(threadPool, Names.SEARCH).getMax(), equalTo(15));
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(2));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(2));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
// Make sure keep alive value changed
assertThat(info(threadPool, Names.SEARCH).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(Names.SEARCH)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertThat(threadPool.executor(Names.SEARCH), sameInstance(oldExecutor));
terminate(threadPool);
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testShutdownNowInterrupts() throws Exception {
ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder()
.put("threadpool.search.type", "cached")
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED);
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(Settings.settingsBuilder()
.put("threadpool." + threadPoolName + ".queue_size", 1000)
.put("name", "testCachedExecutorType").build());
assertEquals(info(threadPool, threadPoolName).getQueueSize().getSingles(), 1000L);
final CountDownLatch latch = new CountDownLatch(1);
ThreadPoolExecutor oldExecutor = (ThreadPoolExecutor) threadPool.executor(Names.SEARCH);
threadPool.executor(Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
ThreadPoolExecutor oldExecutor = (ThreadPoolExecutor) threadPool.executor(threadPoolName);
threadPool.executor(threadPoolName).execute(() -> {
try {
new CountDownLatch(1).await();
} catch (InterruptedException ex) {
@ -214,35 +257,39 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
Thread.currentThread().interrupt();
}
}
});
threadPool.updateSettings(settingsBuilder().put("threadpool.search.type", "fixed").build());
assertThat(threadPool.executor(Names.SEARCH), not(sameInstance(oldExecutor)));
);
threadPool.updateSettings(settingsBuilder().put("threadpool." + threadPoolName + ".queue_size", 2000).build());
assertThat(threadPool.executor(threadPoolName), not(sameInstance(oldExecutor)));
assertThat(oldExecutor.isShutdown(), equalTo(true));
assertThat(oldExecutor.isTerminating(), equalTo(true));
assertThat(oldExecutor.isTerminated(), equalTo(false));
threadPool.shutdownNow(); // should interrupt the thread
latch.await(3, TimeUnit.SECONDS); // If this throws then shotdownNow didn't interrupt
terminate(threadPool);
latch.await(3, TimeUnit.SECONDS); // If this throws then ThreadPool#shutdownNow didn't interrupt
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
public void testCustomThreadPool() throws Exception {
ThreadPool threadPool = new ThreadPool(Settings.settingsBuilder()
.put("threadpool.my_pool1.type", "cached")
ThreadPool threadPool = null;
try {
threadPool = new ThreadPool(Settings.settingsBuilder()
.put("threadpool.my_pool1.type", "scaling")
.put("threadpool.my_pool2.type", "fixed")
.put("threadpool.my_pool2.size", "1")
.put("threadpool.my_pool2.queue_size", "1")
.put("name", "testCustomThreadPool").build());
ThreadPoolInfo groups = threadPool.info();
boolean foundPool1 = false;
boolean foundPool2 = false;
outer: for (ThreadPool.Info info : groups) {
outer:
for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertThat(info.getType(), equalTo("cached"));
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getType(), equalTo("fixed"));
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
assertThat(info.getMin(), equalTo(1));
assertThat(info.getMax(), equalTo(1));
assertThat(info.getQueueSize().singles(), equalTo(1l));
@ -268,16 +315,17 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
groups = threadPool.info();
foundPool1 = false;
foundPool2 = false;
outer: for (ThreadPool.Info info : groups) {
outer:
for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertThat(info.getType(), equalTo("cached"));
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getMax(), equalTo(10));
assertThat(info.getMin(), equalTo(10));
assertThat(info.getQueueSize().singles(), equalTo(1l));
assertThat(info.getType(), equalTo("fixed"));
assertEquals(info.getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
} else {
for (Field field : Names.class.getFields()) {
if (info.getName().equalsIgnoreCase(field.getName())) {
@ -290,7 +338,40 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
}
assertThat(foundPool1, is(true));
assertThat(foundPool2, is(true));
terminate(threadPool);
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
private void terminateThreadPoolIfNeeded(ThreadPool threadPool) throws InterruptedException {
if (threadPool != null) {
terminate(threadPool);
}
}
private ThreadPool.Info info(ThreadPool threadPool, String name) {
for (ThreadPool.Info info : threadPool.info()) {
if (info.getName().equals(name)) {
return info;
}
}
return null;
}
private String randomThreadPoolName() {
Set<String> threadPoolNames = ThreadPool.THREAD_POOL_TYPES.keySet();
return randomFrom(threadPoolNames.toArray(new String[threadPoolNames.size()]));
}
private ThreadPool.ThreadPoolType randomIncorrectThreadPoolType(String threadPoolName) {
Set<ThreadPool.ThreadPoolType> set = new HashSet<>();
set.addAll(Arrays.asList(ThreadPool.ThreadPoolType.values()));
set.remove(ThreadPool.THREAD_POOL_TYPES.get(threadPoolName));
ThreadPool.ThreadPoolType invalidThreadPoolType = randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()]));
return invalidThreadPoolType;
}
private String randomThreadPool(ThreadPool.ThreadPoolType type) {
return randomFrom(ThreadPool.THREAD_POOL_TYPES.entrySet().stream().filter(t -> t.getValue().equals(type)).map(t -> t.getKey()).collect(Collectors.toList()));
}
}

View File

@ -390,3 +390,11 @@ request cache and the field data cache.
This setting would arbitrarily pick the first interface not marked as loopback. Instead, specify by address
scope (e.g. `_local_,_site_` for all loopback and private network addresses) or by explicit interface names,
hostnames, or addresses.
=== Forbid changing of thread pool types
Previously, <<modules-threadpool,thread pool types>> could be dynamically adjusted. The thread pool type effectively
controls the backing queue for the thread pool and modifying this is an expert setting with minimal practical benefits
and high risk of being misused. The ability to change the thread pool type for any thread pool has been removed; do note
that it is still possible to adjust relevant thread pool parameters for each of the thread pools (e.g., depending on
the thread pool type, `keep_alive`, `queue_size`, etc.).

View File

@ -9,87 +9,92 @@ of discarded.
There are several thread pools, but the important ones include:
`generic`::
For generic operations (e.g., background node discovery).
Thread pool type is `cached`.
`index`::
For index/delete operations. Defaults to `fixed`
For index/delete operations. Thread pool type is `fixed`
with a size of `# of available processors`,
queue_size of `200`.
`search`::
For count/search operations. Defaults to `fixed`
For count/search operations. Thread pool type is `fixed`
with a size of `int((# of available_processors * 3) / 2) + 1`,
queue_size of `1000`.
`suggest`::
For suggest operations. Defaults to `fixed`
For suggest operations. Thread pool type is `fixed`
with a size of `# of available processors`,
queue_size of `1000`.
`get`::
For get operations. Defaults to `fixed`
For get operations. Thread pool type is `fixed`
with a size of `# of available processors`,
queue_size of `1000`.
`bulk`::
For bulk operations. Defaults to `fixed`
For bulk operations. Thread pool type is `fixed`
with a size of `# of available processors`,
queue_size of `50`.
`percolate`::
For percolate operations. Defaults to `fixed`
For percolate operations. Thread pool type is `fixed`
with a size of `# of available processors`,
queue_size of `1000`.
`snapshot`::
For snapshot/restore operations. Defaults to `scaling` with a
keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`, max at 5.
For snapshot/restore operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`.
`warmer`::
For segment warm-up operations. Defaults to `scaling` with a
keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`, max at 5.
For segment warm-up operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a size of `min(5, (# of available processors)/2)`.
`refresh`::
For refresh operations. Defaults to `scaling` with a
keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`, max at 10.
For refresh operations. Thread pool type is `scaling` with a
keep-alive of `5m` and a size of `min(10, (# of available processors)/2)`.
`listener`::
Mainly for java client executing of action when listener threaded is set to true.
Default size of `(# of available processors)/2`, max at 10.
Thread pool type is `scaling` with a default size of `min(10, (# of available processors)/2)`.
Changing a specific thread pool can be done by setting its type and
specific type parameters, for example, changing the `index` thread pool
to have more threads:
Changing a specific thread pool can be done by setting its type-specific parameters; for example, changing the `index`
thread pool to have more threads:
[source,js]
--------------------------------------------------
threadpool:
index:
type: fixed
size: 30
--------------------------------------------------
NOTE: you can update threadpool settings live using
<<cluster-update-settings>>.
NOTE: you can update thread pool settings dynamically using <<cluster-update-settings>>.
[float]
[[types]]
=== Thread pool types
The following are the types of thread pools that can be used and their
respective parameters:
The following are the types of thread pools and their respective parameters:
[float]
==== `cache`
==== `cached`
The `cache` thread pool is an unbounded thread pool that will spawn a
thread if there are pending requests. Here is an example of how to set
it:
The `cached` thread pool is an unbounded thread pool that will spawn a
thread if there are pending requests. This thread pool is used to
prevent requests submitted to this pool from blocking or being
rejected. Unused threads in this thread pool will be terminated after
a keep alive expires (defaults to five minutes). The `cached` thread
pool is reserved for the <<modules-threadpool,`generic`>> thread pool.
The `keep_alive` parameter determines how long a thread should be kept
around in the thread pool without doing any work.
[source,js]
--------------------------------------------------
threadpool:
index:
type: cached
generic:
keep_alive: 2m
--------------------------------------------------
[float]
@ -111,7 +116,6 @@ full, it will abort the request.
--------------------------------------------------
threadpool:
index:
type: fixed
size: 30
queue_size: 1000
--------------------------------------------------
@ -130,7 +134,6 @@ around in the thread pool without it doing any work.
--------------------------------------------------
threadpool:
warmer:
type: scaling
size: 8
keep_alive: 2m
--------------------------------------------------

View File

@ -64,10 +64,10 @@ import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.MockEngineFactoryPlugin;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndicesService;
@ -88,7 +88,6 @@ import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransport;
@ -98,20 +97,11 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
@ -119,15 +109,11 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static junit.framework.Assert.fail;
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
import static org.apache.lucene.util.LuceneTestCase.rarely;
import static org.apache.lucene.util.LuceneTestCase.usually;
import static org.apache.lucene.util.LuceneTestCase.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.elasticsearch.test.ESTestCase.assertBusy;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThat;
/**
@ -404,18 +390,6 @@ public final class InternalTestCluster extends TestCluster {
if (random.nextBoolean()) { // sometimes set a
builder.put(SearchService.DEFAULT_KEEPALIVE_KEY, TimeValue.timeValueSeconds(100 + random.nextInt(5 * 60)));
}
if (random.nextBoolean()) {
// change threadpool types to make sure we don't have components that rely on the type of thread pools
for (String name : Arrays.asList(ThreadPool.Names.BULK, ThreadPool.Names.FLUSH, ThreadPool.Names.GET,
ThreadPool.Names.INDEX, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.FORCE_MERGE,
ThreadPool.Names.PERCOLATE, ThreadPool.Names.REFRESH, ThreadPool.Names.SEARCH, ThreadPool.Names.SNAPSHOT,
ThreadPool.Names.SUGGEST, ThreadPool.Names.WARMER)) {
if (random.nextBoolean()) {
final String type = RandomPicks.randomFrom(random, Arrays.asList("fixed", "cached", "scaling"));
builder.put(ThreadPool.THREADPOOL_GROUP + name + ".type", type);
}
}
}
if (random.nextInt(10) == 0) {
// node gets an extra cpu this time