Merge pull request #17017 from jasontedor/generic-thread-pool

Actually bound the generic thread pool
This commit is contained in:
Jason Tedor 2016-04-26 08:27:48 -04:00
commit efeec4d096
9 changed files with 495 additions and 185 deletions

View File

@ -26,16 +26,12 @@ import org.elasticsearch.common.settings.Settings;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
*
*/
public class EsExecutors {
/**
@ -62,16 +58,11 @@ public class EsExecutors {
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
// we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder);
queue.executor = executor;
return executor;
}
public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy(), contextHolder);
}
public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) {
BlockingQueue<Runnable> queue;
if (queueCapacity < 0) {
@ -114,6 +105,7 @@ public class EsExecutors {
}
static class EsThreadFactory implements ThreadFactory {
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
@ -133,6 +125,7 @@ public class EsExecutors {
t.setDaemon(true);
return t;
}
}
/**
@ -141,7 +134,6 @@ public class EsExecutors {
private EsExecutors() {
}
static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> {
ThreadPoolExecutor executor;
@ -151,9 +143,17 @@ public class EsExecutors {
@Override
public boolean offer(E e) {
// first try to transfer to a waiting worker thread
if (!tryTransfer(e)) {
// check if there might be spare capacity in the thread
// pool executor
int left = executor.getMaximumPoolSize() - executor.getCorePoolSize();
if (left > 0) {
// reject queuing the task to force the thread pool
// executor to add a worker if it can; combined
// with ForceQueuePolicy, this causes the thread
// pool to always scale up to max pool size and we
// only queue when there is no spare capacity
return false;
} else {
return super.offer(e);
@ -162,6 +162,7 @@ public class EsExecutors {
return true;
}
}
}
/**
@ -184,4 +185,5 @@ public class EsExecutors {
return 0;
}
}
}

View File

@ -91,7 +91,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
}
public enum ThreadPoolType {
CACHED("cached"),
DIRECT("direct"),
FIXED("fixed"),
SCALING("scaling");
@ -125,12 +124,12 @@ public class ThreadPool extends AbstractComponent implements Closeable {
}
}
public static Map<String, ThreadPoolType> THREAD_POOL_TYPES;
public static final Map<String, ThreadPoolType> THREAD_POOL_TYPES;
static {
HashMap<String, ThreadPoolType> map = new HashMap<>();
map.put(Names.SAME, ThreadPoolType.DIRECT);
map.put(Names.GENERIC, ThreadPoolType.CACHED);
map.put(Names.GENERIC, ThreadPoolType.SCALING);
map.put(Names.LISTENER, ThreadPoolType.FIXED);
map.put(Names.GET, ThreadPoolType.FIXED);
map.put(Names.INDEX, ThreadPoolType.FIXED);
@ -153,33 +152,67 @@ public class ThreadPool extends AbstractComponent implements Closeable {
executorSettings.put(name, settings);
}
private static class ExecutorSettingsBuilder {
Map<String, String> settings = new HashMap<>();
private static abstract class ExecutorSettingsBuilder<T extends ExecutorSettingsBuilder<T>> {
public ExecutorSettingsBuilder(String name) {
settings.put("name", name);
settings.put("type", THREAD_POOL_TYPES.get(name).getType());
private final Settings.Builder builder;
protected ExecutorSettingsBuilder(String name, ThreadPoolType threadPoolType) {
if (THREAD_POOL_TYPES.get(name) != threadPoolType) {
throw new IllegalArgumentException("thread pool [" + name + "] must be of type [" + threadPoolType + "]");
}
builder = Settings.builder();
builder.put("name", name);
builder.put("type", threadPoolType.getType());
}
public ExecutorSettingsBuilder size(int availableProcessors) {
return add("size", Integer.toString(availableProcessors));
}
public ExecutorSettingsBuilder queueSize(int queueSize) {
return add("queue_size", Integer.toString(queueSize));
}
public ExecutorSettingsBuilder keepAlive(String keepAlive) {
public T keepAlive(String keepAlive) {
return add("keep_alive", keepAlive);
}
private ExecutorSettingsBuilder add(String key, String value) {
settings.put(key, value);
return this;
public T queueSize(int queueSize) {
return add("queue_size", queueSize);
}
public Settings build() {
return Settings.builder().put(settings).build();
protected T add(String setting, int value) {
return add(setting, Integer.toString(value));
}
protected T add(String setting, String value) {
builder.put(setting, value);
@SuppressWarnings("unchecked") final T executor = (T)this;
return executor;
}
public final Settings build() { return builder.build(); }
}
private static class FixedExecutorSettingsBuilder extends ExecutorSettingsBuilder<FixedExecutorSettingsBuilder> {
public FixedExecutorSettingsBuilder(String name) {
super(name, ThreadPoolType.FIXED);
}
public FixedExecutorSettingsBuilder size(int size) {
return add("size", Integer.toString(size));
}
}
private static class ScalingExecutorSettingsBuilder extends ExecutorSettingsBuilder<ScalingExecutorSettingsBuilder> {
public ScalingExecutorSettingsBuilder(String name) {
super(name, ThreadPoolType.SCALING);
}
public ScalingExecutorSettingsBuilder min(int min) {
return add("min", min);
}
public ScalingExecutorSettingsBuilder size(int size) {
return add("size", size);
}
}
@ -215,25 +248,26 @@ public class ThreadPool extends AbstractComponent implements Closeable {
validate(groupSettings);
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5);
int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);
int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
Map<String, Settings> defaultExecutorTypeSettings = new HashMap<>();
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GENERIC).size(4 * availableProcessors).keepAlive("30s"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.MANAGEMENT).size(5).keepAlive("5m"));
int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.GENERIC).min(4).size(genericThreadPoolMax).keepAlive("30s"));
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200));
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50));
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000));
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000));
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.MANAGEMENT).min(1).size(5).keepAlive("5m"));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FLUSH).size(halfProcMaxAt5).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.REFRESH).size(halfProcMaxAt10).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.WARMER).size(halfProcMaxAt5).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SNAPSHOT).size(halfProcMaxAt5).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m"));
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10));
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FLUSH).min(1).size(halfProcMaxAt5).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.REFRESH).min(1).size(halfProcMaxAt10).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.WARMER).min(1).size(halfProcMaxAt5).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.SNAPSHOT).min(1).size(halfProcMaxAt5).keepAlive("5m"));
add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.FORCE_MERGE).size(1));
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).min(1).size(availableProcessors * 2).keepAlive("5m"));
add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).min(1).size(availableProcessors * 2).keepAlive("5m"));
this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings);
@ -251,9 +285,6 @@ public class ThreadPool extends AbstractComponent implements Closeable {
}
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
if (!executors.get(Names.GENERIC).info.getThreadPoolType().equals(ThreadPoolType.CACHED)) {
throw new IllegalArgumentException("generic thread pool must be of type cached");
}
this.executors = unmodifiableMap(executors);
this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
@ -447,49 +478,23 @@ public class ThreadPool extends AbstractComponent implements Closeable {
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name);
if (ThreadPoolType.DIRECT == threadPoolType) {
if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}]", name, type);
logger.debug("updating thread pool [{}], type [{}]", name, type);
} else {
logger.debug("creating thread_pool [{}], type [{}]", name, type);
logger.debug("creating thread pool [{}], type [{}]", name, type);
}
return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, threadPoolType));
} else if (ThreadPoolType.CACHED == threadPoolType) {
if (!Names.GENERIC.equals(name)) {
throw new IllegalArgumentException("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]");
}
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
if (previousExecutorHolder != null) {
if (ThreadPoolType.CACHED == previousInfo.getThreadPoolType()) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, -1, -1, updatedKeepAlive, null));
}
return previousExecutorHolder;
}
if (previousInfo.getKeepAlive() != null) {
defaultKeepAlive = previousInfo.getKeepAlive();
}
}
TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive);
if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
} else {
logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive);
}
Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext);
return new ExecutorHolder(executor, new Info(name, threadPoolType, -1, -1, keepAlive, null));
} else if (ThreadPoolType.FIXED == threadPoolType) {
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null));
if (previousExecutorHolder != null) {
assert previousInfo != null;
if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) {
SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize())));
if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) {
int updatedSize = applyHardSizeLimit(name, settings.getAsInt("size", previousInfo.getMax()));
if (previousInfo.getMax() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize);
logger.debug("updating thread pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize);
// if you think this code is crazy: that's because it is!
if (updatedSize > previousInfo.getMax()) {
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize);
@ -511,20 +516,24 @@ public class ThreadPool extends AbstractComponent implements Closeable {
int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize));
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
logger.debug("creating thread pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory, threadContext);
return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize));
} else if (ThreadPoolType.SCALING == threadPoolType) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1);
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
final Integer queueSize = settings.getAsInt("queue_size", defaultSettings.getAsInt("queue_size", null));
if (queueSize != null) {
throw new IllegalArgumentException("thread pool [" + name + "] of type scaling can not have its queue re-sized but was [" + queueSize + "]");
}
if (previousExecutorHolder != null) {
if (ThreadPoolType.SCALING == previousInfo.getThreadPoolType()) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
int updatedMin = settings.getAsInt("min", previousInfo.getMin());
int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax()));
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) {
logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
logger.debug("updating thread pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive);
if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) {
((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS);
}
@ -552,9 +561,9 @@ public class ThreadPool extends AbstractComponent implements Closeable {
int min = settings.getAsInt("min", defaultMin);
int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize));
if (previousExecutorHolder != null) {
logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
logger.debug("updating thread pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
} else {
logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
logger.debug("creating thread pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive);
}
Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext);
return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null));
@ -577,6 +586,32 @@ public class ThreadPool extends AbstractComponent implements Closeable {
return size;
}
/**
* Constrains a value between minimum and maximum values
* (inclusive).
*
* @param value the value to constrain
* @param min the minimum acceptable value
* @param max the maximum acceptable value
* @return min if value is less than min, max if value is greater
* than value, otherwise value
*/
static int boundedBy(int value, int min, int max) {
return Math.min(max, Math.max(min, value));
}
static int halfNumberOfProcessorsMaxFive(int numberOfProcessors) {
return boundedBy((numberOfProcessors + 1) / 2, 1, 5);
}
static int halfNumberOfProcessorsMaxTen(int numberOfProcessors) {
return boundedBy((numberOfProcessors + 1) / 2, 1, 10);
}
static int twiceNumberOfProcessors(int numberOfProcessors) {
return boundedBy(2 * numberOfProcessors, 2, Integer.MAX_VALUE);
}
private void updateSettings(Settings settings) {
Map<String, Settings> groupSettings = settings.getAsGroups();
if (groupSettings.isEmpty()) {
@ -969,4 +1004,5 @@ public class ThreadPool extends AbstractComponent implements Closeable {
public ThreadContext getThreadContext() {
return threadContext;
}
}

View File

@ -50,8 +50,7 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
//Have very low pool and queue sizes to overwhelm internal pools easily
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("threadpool.generic.size", 1)
.put("threadpool.generic.queue_size", 1)
.put("threadpool.generic.max", 4)
// don't mess with this one! It's quite sensitive to a low queue size
// (see also ThreadedActionListener which is happily spawning threads even when we already got rejected)
//.put("threadpool.listener.queue_size", 1)

View File

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.test.ESTestCase;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class ESThreadPoolTestCase extends ESTestCase {
protected final ThreadPool.Info info(final ThreadPool threadPool, final String name) {
for (final ThreadPool.Info info : threadPool.info()) {
if (info.getName().equals(name)) {
return info;
}
}
throw new IllegalArgumentException(name);
}
protected final ThreadPoolStats.Stats stats(final ThreadPool threadPool, final String name) {
for (final ThreadPoolStats.Stats stats : threadPool.stats()) {
if (name.equals(stats.getName())) {
return stats;
}
}
throw new IllegalArgumentException(name);
}
protected final void terminateThreadPoolIfNeeded(final ThreadPool threadPool) throws InterruptedException {
if (threadPool != null) {
terminate(threadPool);
}
}
static String randomThreadPool(final ThreadPool.ThreadPoolType type) {
return randomFrom(
ThreadPool.THREAD_POOL_TYPES
.entrySet().stream()
.filter(t -> t.getValue().equals(type))
.map(Map.Entry::getKey)
.collect(Collectors.toList()));
}
}

View File

@ -0,0 +1,245 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
public class ScalingThreadPoolTests extends ESThreadPoolTestCase {
public void testScalingThreadPoolConfiguration() throws InterruptedException {
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
final Settings.Builder builder = Settings.builder();
final int min;
if (randomBoolean()) {
min = randomIntBetween(0, 8);
builder.put("threadpool." + threadPoolName + ".min", min);
} else {
min = "generic".equals(threadPoolName) ? 4 : 1; // the defaults
}
final int sizeBasedOnNumberOfProcessors;
if (randomBoolean()) {
final int processors = randomIntBetween(1, 64);
sizeBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors);
builder.put("processors", processors);
} else {
sizeBasedOnNumberOfProcessors = expectedSize(threadPoolName, Math.min(32, Runtime.getRuntime().availableProcessors()));
}
final int expectedSize;
if (sizeBasedOnNumberOfProcessors < min || randomBoolean()) {
expectedSize = randomIntBetween(min, 16);
builder.put("threadpool." + threadPoolName + ".size", expectedSize);
} else {
expectedSize = sizeBasedOnNumberOfProcessors;
}
final long keepAlive;
if (randomBoolean()) {
keepAlive = randomIntBetween(1, 300);
builder.put("threadpool." + threadPoolName + ".keep_alive", keepAlive + "s");
} else {
keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults
}
runScalingThreadPoolTest(builder.build(), (clusterSettings, threadPool) -> {
final Executor executor = threadPool.executor(threadPoolName);
assertThat(executor, instanceOf(EsThreadPoolExecutor.class));
final EsThreadPoolExecutor esThreadPoolExecutor = (EsThreadPoolExecutor)executor;
final ThreadPool.Info info = info(threadPool, threadPoolName);
assertThat(info.getName(), equalTo(threadPoolName));
assertThat(info.getThreadPoolType(), equalTo(ThreadPool.ThreadPoolType.SCALING));
assertThat(info.getKeepAlive().seconds(), equalTo(keepAlive));
assertThat(esThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS), equalTo(keepAlive));
assertNull(info.getQueueSize());
assertThat(esThreadPoolExecutor.getQueue().remainingCapacity(), equalTo(Integer.MAX_VALUE));
assertThat(info.getMin(), equalTo(min));
assertThat(esThreadPoolExecutor.getCorePoolSize(), equalTo(min));
assertThat(info.getMax(), equalTo(expectedSize));
assertThat(esThreadPoolExecutor.getMaximumPoolSize(), equalTo(expectedSize));
});
}
@FunctionalInterface
private interface SizeFunction {
int size(int numberOfProcessors);
}
private int expectedSize(final String threadPoolName, final int numberOfProcessors) {
final Map<String, SizeFunction> sizes = new HashMap<>();
sizes.put(ThreadPool.Names.GENERIC, n -> ThreadPool.boundedBy(4 * n, 128, 512));
sizes.put(ThreadPool.Names.MANAGEMENT, n -> 5);
sizes.put(ThreadPool.Names.FLUSH, ThreadPool::halfNumberOfProcessorsMaxFive);
sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfNumberOfProcessorsMaxTen);
sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfNumberOfProcessorsMaxFive);
sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfNumberOfProcessorsMaxFive);
sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceNumberOfProcessors);
sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceNumberOfProcessors);
return sizes.get(threadPoolName).size(numberOfProcessors);
}
public void testValidDynamicKeepAlive() throws InterruptedException {
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> {
final Executor beforeExecutor = threadPool.executor(threadPoolName);
final long seconds = randomIntBetween(1, 300);
clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".keep_alive", seconds + "s"));
final Executor afterExecutor = threadPool.executor(threadPoolName);
assertSame(beforeExecutor, afterExecutor);
final ThreadPool.Info info = info(threadPool, threadPoolName);
assertThat(info.getKeepAlive().seconds(), equalTo(seconds));
});
}
public void testScalingThreadPoolIsBounded() throws InterruptedException {
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
final int size = randomIntBetween(32, 512);
final Settings settings = Settings.builder().put("threadpool." + threadPoolName + ".size", size).build();
runScalingThreadPoolTest(settings, (clusterSettings, threadPool) -> {
final CountDownLatch latch = new CountDownLatch(1);
final int numberOfTasks = 2 * size;
final CountDownLatch taskLatch = new CountDownLatch(numberOfTasks);
for (int i = 0; i < numberOfTasks; i++) {
threadPool.executor(threadPoolName).execute(() -> {
try {
latch.await();
taskLatch.countDown();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
});
}
final ThreadPoolStats.Stats stats = stats(threadPool, threadPoolName);
assertThat(stats.getQueue(), equalTo(numberOfTasks - size));
assertThat(stats.getLargest(), equalTo(size));
latch.countDown();
try {
taskLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
public void testScalingThreadPoolThreadsAreTerminatedAfterKeepAlive() throws InterruptedException {
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
final Settings settings =
Settings.builder()
.put("threadpool." + threadPoolName + ".size", 128)
.put("threadpool." + threadPoolName + ".keep_alive", "1ms")
.build();
runScalingThreadPoolTest(settings, ((clusterSettings, threadPool) -> {
final CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 128; i++) {
threadPool.executor(threadPoolName).execute(() -> {
try {
latch.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
});
}
final int active = stats(threadPool, threadPoolName).getThreads();
assertThat(active, equalTo(128));
latch.countDown();
do {
spinForAtLeastOneMillisecond();
} while (stats(threadPool, threadPoolName).getThreads() > 4);
assertThat(stats(threadPool, threadPoolName).getCompleted(), equalTo(128L));
}));
}
public void testDynamicThreadPoolSize() throws InterruptedException {
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> {
final Executor beforeExecutor = threadPool.executor(threadPoolName);
int expectedMin = "generic".equals(threadPoolName) ? 4 : 1;
final int size = randomIntBetween(expectedMin, Integer.MAX_VALUE);
clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".size", size));
final Executor afterExecutor = threadPool.executor(threadPoolName);
assertSame(beforeExecutor, afterExecutor);
final ThreadPool.Info info = info(threadPool, threadPoolName);
assertThat(info.getMin(), equalTo(expectedMin));
assertThat(info.getMax(), equalTo(size));
assertThat(afterExecutor, instanceOf(EsThreadPoolExecutor.class));
final EsThreadPoolExecutor executor = (EsThreadPoolExecutor)afterExecutor;
assertThat(executor.getCorePoolSize(), equalTo(expectedMin));
assertThat(executor.getMaximumPoolSize(), equalTo(size));
});
}
public void testResizingScalingThreadPoolQueue() throws InterruptedException {
final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING);
runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> {
final int size = randomIntBetween(1, Integer.MAX_VALUE);
final IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".queue_size", size)));
assertThat(e, hasToString(
"java.lang.IllegalArgumentException: thread pool [" + threadPoolName +
"] of type scaling can not have its queue re-sized but was [" +
size + "]"));
});
}
public void runScalingThreadPoolTest(
final Settings settings,
final BiConsumer<ClusterSettings, ThreadPool> consumer) throws InterruptedException {
ThreadPool threadPool = null;
try {
final String test = Thread.currentThread().getStackTrace()[2].getMethodName();
final Settings nodeSettings = Settings.builder().put(settings).put("node.name", test).build();
threadPool = new ThreadPool(nodeSettings);
final ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings);
consumer.accept(clusterSettings, threadPool);
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
private static Settings settings(final String setting, final int value) {
return settings(setting, Integer.toString(value));
}
private static Settings settings(final String setting, final String value) {
return Settings.builder().put(setting, value).build();
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.CoreMatchers.equalTo;
public class ThreadPoolTests extends ESTestCase {
public void testBoundedByBelowMin() {
int min = randomIntBetween(0, 32);
int max = randomIntBetween(min + 1, 64);
int value = randomIntBetween(Integer.MIN_VALUE, min - 1);
assertThat(ThreadPool.boundedBy(value, min, max), equalTo(min));
}
public void testBoundedByAboveMax() {
int min = randomIntBetween(0, 32);
int max = randomIntBetween(min + 1, 64);
int value = randomIntBetween(max + 1, Integer.MAX_VALUE);
assertThat(ThreadPool.boundedBy(value, min, max), equalTo(max));
}
public void testBoundedByBetweenMinAndMax() {
int min = randomIntBetween(0, 32);
int max = randomIntBetween(min + 1, 64);
int value = randomIntBetween(min, max);
assertThat(ThreadPool.boundedBy(value, min, max), equalTo(value));
}
}

View File

@ -23,7 +23,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.lang.reflect.Field;
@ -46,7 +45,7 @@ import static org.hamcrest.Matchers.sameInstance;
/**
*/
public class UpdateThreadPoolSettingsTests extends ESTestCase {
public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase {
public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedException {
String threadPoolName = randomThreadPoolName();
@ -162,56 +161,6 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
}
}
public void testCachedExecutorType() throws InterruptedException {
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.CACHED);
ThreadPool threadPool = null;
try {
Settings nodeSettings = Settings.builder()
.put("node.name", "testCachedExecutorType").build();
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings);
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
Settings settings = clusterSettings.applySettings(Settings.builder()
.put("threadpool." + threadPoolName + ".keep_alive", "10m")
.build());
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(0));
// Make sure keep alive value changed
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L));
// Make sure keep alive value reused
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L));
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
// Change keep alive
Executor oldExecutor = threadPool.executor(threadPoolName);
settings = clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".keep_alive", "1m").build());
// Make sure keep alive value changed
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
// Set the same keep alive
settings = clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".keep_alive", "1m").build());
// Make sure keep alive value didn't change
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L));
// Make sure executor didn't change
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED);
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
} finally {
terminateThreadPoolIfNeeded(threadPool);
}
}
private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
return Math.min(size, EsExecutors.boundedNumberOfProcessors(settings));
@ -273,7 +222,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));
// Change queue capacity
settings = clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".queue", "500")
clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".queue", "500")
.build());
} finally {
terminateThreadPoolIfNeeded(threadPool);
@ -290,9 +239,11 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
threadPool = new ThreadPool(nodeSettings);
ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool.setClusterSettings(clusterSettings);
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(1));
final int expectedMinimum = "generic".equals(threadPoolName) ? 4 : 1;
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedMinimum));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(5L));
final long expectedKeepAlive = "generic".equals(threadPoolName) ? 30 : 300;
assertThat(info(threadPool, threadPoolName).getKeepAlive().seconds(), equalTo(expectedKeepAlive));
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
@ -358,6 +309,9 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
try {
Settings nodeSettings = Settings.builder()
.put("threadpool.my_pool1.type", "scaling")
.put("threadpool.my_pool1.min", 1)
.put("threadpool.my_pool1.size", EsExecutors.boundedNumberOfProcessors(Settings.EMPTY))
.put("threadpool.my_pool1.keep_alive", "1m")
.put("threadpool.my_pool2.type", "fixed")
.put("threadpool.my_pool2.size", "1")
.put("threadpool.my_pool2.queue_size", "1")
@ -429,21 +383,6 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
}
}
private void terminateThreadPoolIfNeeded(ThreadPool threadPool) throws InterruptedException {
if (threadPool != null) {
terminate(threadPool);
}
}
private ThreadPool.Info info(ThreadPool threadPool, String name) {
for (ThreadPool.Info info : threadPool.info()) {
if (info.getName().equals(name)) {
return info;
}
}
return null;
}
private String randomThreadPoolName() {
Set<String> threadPoolNames = ThreadPool.THREAD_POOL_TYPES.keySet();
return randomFrom(threadPoolNames.toArray(new String[threadPoolNames.size()]));
@ -456,7 +395,4 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
return randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()]));
}
private String randomThreadPool(ThreadPool.ThreadPoolType type) {
return randomFrom(ThreadPool.THREAD_POOL_TYPES.entrySet().stream().filter(t -> t.getValue().equals(type)).map(Map.Entry::getKey).collect(Collectors.toList()));
}
}

View File

@ -11,7 +11,7 @@ There are several thread pools, but the important ones include:
`generic`::
For generic operations (e.g., background node discovery).
Thread pool type is `cached`.
Thread pool type is `scaling`.
`index`::
For index/delete operations. Thread pool type is `fixed`
@ -72,26 +72,6 @@ NOTE: you can update thread pool settings dynamically using <<cluster-update-set
The following are the types of thread pools and their respective parameters:
[float]
==== `cached`
The `cached` thread pool is an unbounded thread pool that will spawn a
thread if there are pending requests. This thread pool is used to
prevent requests submitted to this pool from blocking or being
rejected. Unused threads in this thread pool will be terminated after
a keep alive expires (defaults to five minutes). The `cached` thread
pool is reserved for the <<modules-threadpool,`generic`>> thread pool.
The `keep_alive` parameter determines how long a thread should be kept
around in the thread pool without doing any work.
[source,js]
--------------------------------------------------
threadpool:
generic:
keep_alive: 2m
--------------------------------------------------
[float]
==== `fixed`
@ -118,9 +98,9 @@ threadpool:
[float]
==== `scaling`
The `scaling` thread pool holds a dynamic number of threads. This number is
proportional to the workload and varies between 1 and the value of the
`size` parameter.
The `scaling` thread pool holds a dynamic number of threads. This
number is proportional to the workload and varies between the value of
the `min` and `size` parameters.
The `keep_alive` parameter determines how long a thread should be kept
around in the thread pool without it doing any work.
@ -129,6 +109,7 @@ around in the thread pool without it doing any work.
--------------------------------------------------
threadpool:
warmer:
min: 1
size: 8
keep_alive: 2m
--------------------------------------------------

View File

@ -314,7 +314,7 @@ public final class InternalTestCluster extends TestCluster {
// always reduce this - it can make tests really slow
builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50)));
defaultSettings = builder.build();
executor = EsExecutors.newCached("test runner", 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY));
executor = EsExecutors.newScaling("test runner", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY));
}
public static String configuredNodeMode() {