From 5608fa7ac18897b9f4d5a69bde073fc47b8ad4d0 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 8 Mar 2016 13:39:43 -0500 Subject: [PATCH 1/5] Actually bound the generic thread pool This commit actually bounds the size of the generic thread pool. The generic thread pool was of type cached, a thread pool with an unbounded number of workers and an unbounded work queue. With this commit, the generic thread pool is now of type scaling. As such, the cached thread pool type has been removed. By default, the generic thread pool is constructed with a core pool size of four, a max pool size of 128 and idle workers can be reaped after a keep-alive time of thirty seconds expires. The work queue for this thread pool remains unbounded. --- .../common/util/concurrent/EsExecutors.java | 14 +- .../elasticsearch/threadpool/ThreadPool.java | 186 +++++++------ .../action/bulk/BulkProcessorRetryIT.java | 3 +- .../threadpool/ESThreadPoolTestCase.java | 62 +++++ .../threadpool/ScalingThreadPoolTests.java | 244 ++++++++++++++++++ .../threadpool/ThreadPoolTests.java | 49 ++++ .../UpdateThreadPoolSettingsTests.java | 82 +----- docs/reference/modules/threadpool.asciidoc | 29 +-- .../test/InternalTestCluster.java | 2 +- 9 files changed, 491 insertions(+), 180 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/threadpool/ESThreadPoolTestCase.java create mode 100644 core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java create mode 100644 core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index df1288d4fd2..e06aad96d61 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings; import java.util.Arrays; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; @@ -62,16 +63,11 @@ public class EsExecutors { public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { ExecutorScalingQueue queue = new ExecutorScalingQueue<>(); - // we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder); queue.executor = executor; return executor; } - public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { - return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue(), threadFactory, new EsAbortPolicy(), contextHolder); - } - public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) { BlockingQueue queue; if (queueCapacity < 0) { @@ -151,9 +147,17 @@ public class EsExecutors { @Override public boolean offer(E e) { + // first try to transfer to a waiting worker thread if (!tryTransfer(e)) { + // check if there might be spare capacity in the thread + // pool executor int left = executor.getMaximumPoolSize() - executor.getCorePoolSize(); if (left > 0) { + // reject queuing the task to force the thread pool + // executor to add a worker if it can; combined + // with ForceQueuePolicy, this causes the thread + // pool to always scale up to max pool size and we + // only queue when there is no spare capacity return false; } else { return super.offer(e); diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 04860d1f84f..9e95299c159 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -91,7 +91,6 @@ public class ThreadPool extends AbstractComponent implements Closeable { } public enum ThreadPoolType { - CACHED("cached"), DIRECT("direct"), FIXED("fixed"), SCALING("scaling"); @@ -125,12 +124,12 @@ public class ThreadPool extends AbstractComponent implements Closeable { } } - public static Map THREAD_POOL_TYPES; + public static final Map THREAD_POOL_TYPES; static { HashMap map = new HashMap<>(); map.put(Names.SAME, ThreadPoolType.DIRECT); - map.put(Names.GENERIC, ThreadPoolType.CACHED); + map.put(Names.GENERIC, ThreadPoolType.SCALING); map.put(Names.LISTENER, ThreadPoolType.FIXED); map.put(Names.GET, ThreadPoolType.FIXED); map.put(Names.INDEX, ThreadPoolType.FIXED); @@ -153,33 +152,67 @@ public class ThreadPool extends AbstractComponent implements Closeable { executorSettings.put(name, settings); } - private static class ExecutorSettingsBuilder { - Map settings = new HashMap<>(); + private static abstract class ExecutorSettingsBuilder> { - public ExecutorSettingsBuilder(String name) { - settings.put("name", name); - settings.put("type", THREAD_POOL_TYPES.get(name).getType()); + private final Settings.Builder builder; + + protected ExecutorSettingsBuilder(String name, ThreadPoolType threadPoolType) { + if (THREAD_POOL_TYPES.get(name) != threadPoolType) { + throw new IllegalArgumentException("thread pool [" + name + "] must be of type [" + threadPoolType + "]"); + } + builder = Settings.builder(); + builder.put("name", name); + builder.put("type", threadPoolType.getType()); } - public ExecutorSettingsBuilder size(int availableProcessors) { - return add("size", Integer.toString(availableProcessors)); - } - - public ExecutorSettingsBuilder queueSize(int queueSize) { - return add("queue_size", Integer.toString(queueSize)); - } - - public ExecutorSettingsBuilder keepAlive(String keepAlive) { + public T keepAlive(String keepAlive) { return add("keep_alive", keepAlive); } - private ExecutorSettingsBuilder add(String key, String value) { - settings.put(key, value); - return this; + public T queueSize(int queueSize) { + return add("queue_size", queueSize); } - public Settings build() { - return Settings.builder().put(settings).build(); + protected T add(String setting, int value) { + return add(setting, Integer.toString(value)); + } + + + protected T add(String setting, String value) { + builder.put(setting, value); + @SuppressWarnings("unchecked") final T executor = (T)this; + return executor; + } + + public final Settings build() { return builder.build(); } + + } + + private static class FixedExecutorSettingsBuilder extends ExecutorSettingsBuilder { + + public FixedExecutorSettingsBuilder(String name) { + super(name, ThreadPoolType.FIXED); + } + + public FixedExecutorSettingsBuilder size(int size) { + return add("size", Integer.toString(size)); + } + + } + + private static class ScalingExecutorSettingsBuilder extends ExecutorSettingsBuilder { + + public ScalingExecutorSettingsBuilder(String name) { + super(name, ThreadPoolType.SCALING); + } + + public ScalingExecutorSettingsBuilder min(int min) { + return add("min", min); + } + + + public ScalingExecutorSettingsBuilder size(int size) { + return add("size", size); } } @@ -215,25 +248,26 @@ public class ThreadPool extends AbstractComponent implements Closeable { validate(groupSettings); int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); - int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5); - int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10); + int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); + int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); Map defaultExecutorTypeSettings = new HashMap<>(); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GENERIC).size(4 * availableProcessors).keepAlive("30s")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.MANAGEMENT).size(5).keepAlive("5m")); + int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.GENERIC).min(4).size(genericThreadPoolMax).keepAlive("30s")); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.INDEX).size(availableProcessors).queueSize(200)); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.BULK).size(availableProcessors).queueSize(50)); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.GET).size(availableProcessors).queueSize(1000)); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.SEARCH).size(((availableProcessors * 3) / 2) + 1).queueSize(1000)); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.MANAGEMENT).min(1).size(5).keepAlive("5m")); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FLUSH).size(halfProcMaxAt5).keepAlive("5m")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.REFRESH).size(halfProcMaxAt10).keepAlive("5m")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.WARMER).size(halfProcMaxAt5).keepAlive("5m")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.SNAPSHOT).size(halfProcMaxAt5).keepAlive("5m")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FORCE_MERGE).size(1)); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).size(availableProcessors * 2).keepAlive("5m")); - add(defaultExecutorTypeSettings, new ExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).size(availableProcessors * 2).keepAlive("5m")); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.LISTENER).size(halfProcMaxAt10)); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FLUSH).min(1).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.REFRESH).min(1).size(halfProcMaxAt10).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.WARMER).min(1).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.SNAPSHOT).min(1).size(halfProcMaxAt5).keepAlive("5m")); + add(defaultExecutorTypeSettings, new FixedExecutorSettingsBuilder(Names.FORCE_MERGE).size(1)); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FETCH_SHARD_STARTED).min(1).size(availableProcessors * 2).keepAlive("5m")); + add(defaultExecutorTypeSettings, new ScalingExecutorSettingsBuilder(Names.FETCH_SHARD_STORE).min(1).size(availableProcessors * 2).keepAlive("5m")); this.defaultExecutorTypeSettings = unmodifiableMap(defaultExecutorTypeSettings); @@ -251,9 +285,6 @@ public class ThreadPool extends AbstractComponent implements Closeable { } executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); - if (!executors.get(Names.GENERIC).info.getThreadPoolType().equals(ThreadPoolType.CACHED)) { - throw new IllegalArgumentException("generic thread pool must be of type cached"); - } this.executors = unmodifiableMap(executors); this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); @@ -447,49 +478,23 @@ public class ThreadPool extends AbstractComponent implements Closeable { ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name); if (ThreadPoolType.DIRECT == threadPoolType) { if (previousExecutorHolder != null) { - logger.debug("updating thread_pool [{}], type [{}]", name, type); + logger.debug("updating thread pool [{}], type [{}]", name, type); } else { - logger.debug("creating thread_pool [{}], type [{}]", name, type); + logger.debug("creating thread pool [{}], type [{}]", name, type); } return new ExecutorHolder(DIRECT_EXECUTOR, new Info(name, threadPoolType)); - } else if (ThreadPoolType.CACHED == threadPoolType) { - if (!Names.GENERIC.equals(name)) { - throw new IllegalArgumentException("thread pool type cached is reserved only for the generic thread pool and can not be applied to [" + name + "]"); - } - TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); - if (previousExecutorHolder != null) { - if (ThreadPoolType.CACHED == previousInfo.getThreadPoolType()) { - TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); - if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { - logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); - ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); - return new ExecutorHolder(previousExecutorHolder.executor(), new Info(name, threadPoolType, -1, -1, updatedKeepAlive, null)); - } - return previousExecutorHolder; - } - if (previousInfo.getKeepAlive() != null) { - defaultKeepAlive = previousInfo.getKeepAlive(); - } - } - TimeValue keepAlive = settings.getAsTime("keep_alive", defaultKeepAlive); - if (previousExecutorHolder != null) { - logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); - } else { - logger.debug("creating thread_pool [{}], type [{}], keep_alive [{}]", name, type, keepAlive); - } - Executor executor = EsExecutors.newCached(name, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext); - return new ExecutorHolder(executor, new Info(name, threadPoolType, -1, -1, keepAlive, null)); } else if (ThreadPoolType.FIXED == threadPoolType) { int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null)); if (previousExecutorHolder != null) { + assert previousInfo != null; if (ThreadPoolType.FIXED == previousInfo.getThreadPoolType()) { SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize()))); if (Objects.equals(previousInfo.getQueueSize(), updatedQueueSize)) { int updatedSize = applyHardSizeLimit(name, settings.getAsInt("size", previousInfo.getMax())); if (previousInfo.getMax() != updatedSize) { - logger.debug("updating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize); + logger.debug("updating thread pool [{}], type [{}], size [{}], queue_size [{}]", name, type, updatedSize, updatedQueueSize); // if you think this code is crazy: that's because it is! if (updatedSize > previousInfo.getMax()) { ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setMaximumPoolSize(updatedSize); @@ -511,20 +516,24 @@ public class ThreadPool extends AbstractComponent implements Closeable { int size = applyHardSizeLimit(name, settings.getAsInt("size", defaultSize)); SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); - logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); + logger.debug("creating thread pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); Executor executor = EsExecutors.newFixed(name, size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory, threadContext); return new ExecutorHolder(executor, new Info(name, threadPoolType, size, size, null, queueSize)); } else if (ThreadPoolType.SCALING == threadPoolType) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); int defaultMin = defaultSettings.getAsInt("min", 1); int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); + final Integer queueSize = settings.getAsInt("queue_size", defaultSettings.getAsInt("queue_size", null)); + if (queueSize != null) { + throw new IllegalArgumentException("thread pool [" + name + "] can not have its queue re-sized but was [" + queueSize + "]"); + } if (previousExecutorHolder != null) { if (ThreadPoolType.SCALING == previousInfo.getThreadPoolType()) { TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); int updatedMin = settings.getAsInt("min", previousInfo.getMin()); int updatedSize = settings.getAsInt("max", settings.getAsInt("size", previousInfo.getMax())); if (!previousInfo.getKeepAlive().equals(updatedKeepAlive) || previousInfo.getMin() != updatedMin || previousInfo.getMax() != updatedSize) { - logger.debug("updating thread_pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); + logger.debug("updating thread pool [{}], type [{}], keep_alive [{}]", name, type, updatedKeepAlive); if (!previousInfo.getKeepAlive().equals(updatedKeepAlive)) { ((EsThreadPoolExecutor) previousExecutorHolder.executor()).setKeepAliveTime(updatedKeepAlive.millis(), TimeUnit.MILLISECONDS); } @@ -552,9 +561,9 @@ public class ThreadPool extends AbstractComponent implements Closeable { int min = settings.getAsInt("min", defaultMin); int size = settings.getAsInt("max", settings.getAsInt("size", defaultSize)); if (previousExecutorHolder != null) { - logger.debug("updating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); + logger.debug("updating thread pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); } else { - logger.debug("creating thread_pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); + logger.debug("creating thread pool [{}], type [{}], min [{}], size [{}], keep_alive [{}]", name, type, min, size, keepAlive); } Executor executor = EsExecutors.newScaling(name, min, size, keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, threadContext); return new ExecutorHolder(executor, new Info(name, threadPoolType, min, size, keepAlive, null)); @@ -577,6 +586,32 @@ public class ThreadPool extends AbstractComponent implements Closeable { return size; } + /** + * Constrains a value between minimum and maximum values + * (inclusive). + * + * @param value the value to constrain + * @param min the minimum acceptable value + * @param max the maximum acceptable value + * @return min if value is less than min, max if value is greater + * than value, otherwise value + */ + static int boundedBy(int value, int min, int max) { + return Math.min(max, Math.max(min, value)); + } + + static int halfNumberOfProcessorsMaxFive(int numberOfProcessors) { + return boundedBy((numberOfProcessors + 1) / 2, 1, 5); + } + + static int halfNumberOfProcessorsMaxTen(int numberOfProcessors) { + return boundedBy((numberOfProcessors + 1) / 2, 1, 10); + } + + static int twiceNumberOfProcessors(int numberOfProcessors) { + return boundedBy(2 * numberOfProcessors, 2, Integer.MAX_VALUE); + } + private void updateSettings(Settings settings) { Map groupSettings = settings.getAsGroups(); if (groupSettings.isEmpty()) { @@ -969,4 +1004,5 @@ public class ThreadPool extends AbstractComponent implements Closeable { public ThreadContext getThreadContext() { return threadContext; } + } diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 503daba8c2a..1604ab24160 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -50,8 +50,7 @@ public class BulkProcessorRetryIT extends ESIntegTestCase { //Have very low pool and queue sizes to overwhelm internal pools easily return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) - .put("threadpool.generic.size", 1) - .put("threadpool.generic.queue_size", 1) + .put("threadpool.generic.max", 4) // don't mess with this one! It's quite sensitive to a low queue size // (see also ThreadedActionListener which is happily spawning threads even when we already got rejected) //.put("threadpool.listener.queue_size", 1) diff --git a/core/src/test/java/org/elasticsearch/threadpool/ESThreadPoolTestCase.java b/core/src/test/java/org/elasticsearch/threadpool/ESThreadPoolTestCase.java new file mode 100644 index 00000000000..7fbd3ccd31b --- /dev/null +++ b/core/src/test/java/org/elasticsearch/threadpool/ESThreadPoolTestCase.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.test.ESTestCase; + +import java.util.Map; +import java.util.stream.Collectors; + +public abstract class ESThreadPoolTestCase extends ESTestCase { + + protected final ThreadPool.Info info(final ThreadPool threadPool, final String name) { + for (final ThreadPool.Info info : threadPool.info()) { + if (info.getName().equals(name)) { + return info; + } + } + throw new IllegalArgumentException(name); + } + + protected final ThreadPoolStats.Stats stats(final ThreadPool threadPool, final String name) { + for (final ThreadPoolStats.Stats stats : threadPool.stats()) { + if (name.equals(stats.getName())) { + return stats; + } + } + throw new IllegalArgumentException(name); + } + + protected final void terminateThreadPoolIfNeeded(final ThreadPool threadPool) throws InterruptedException { + if (threadPool != null) { + terminate(threadPool); + } + } + + static String randomThreadPool(final ThreadPool.ThreadPoolType type) { + return randomFrom( + ThreadPool.THREAD_POOL_TYPES + .entrySet().stream() + .filter(t -> t.getValue().equals(type)) + .map(Map.Entry::getKey) + .collect(Collectors.toList())); + } + +} diff --git a/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java new file mode 100644 index 00000000000..e229b466ff7 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -0,0 +1,244 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; + +public class ScalingThreadPoolTests extends ESThreadPoolTestCase { + + public void testScalingThreadPoolConfiguration() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + final Settings.Builder builder = Settings.builder(); + + final int min; + if (randomBoolean()) { + min = randomIntBetween(1, 8); + builder.put("threadpool." + threadPoolName + ".min", min); + } else { + min = "generic".equals(threadPoolName) ? 4 : 1; // the defaults + } + + final int sizeBasedOnNumberOfProcessors; + if (randomBoolean()) { + final int processors = randomIntBetween(1, 64); + sizeBasedOnNumberOfProcessors = expectedSize(threadPoolName, processors); + builder.put("processors", processors); + } else { + sizeBasedOnNumberOfProcessors = expectedSize(threadPoolName, Math.min(32, Runtime.getRuntime().availableProcessors())); + } + + final int expectedSize; + if (sizeBasedOnNumberOfProcessors < min || randomBoolean()) { + expectedSize = randomIntBetween(min + 1, 16); + builder.put("threadpool." + threadPoolName + ".size", expectedSize); + } else { + expectedSize = sizeBasedOnNumberOfProcessors; + } + + final long keepAlive; + if (randomBoolean()) { + keepAlive = randomIntBetween(1, 300); + builder.put("threadpool." + threadPoolName + ".keep_alive", keepAlive + "s"); + } else { + keepAlive = "generic".equals(threadPoolName) ? 30 : 300; // the defaults + } + + runScalingThreadPoolTest(builder.build(), (clusterSettings, threadPool) -> { + final Executor executor = threadPool.executor(threadPoolName); + assertThat(executor, instanceOf(EsThreadPoolExecutor.class)); + final EsThreadPoolExecutor esThreadPoolExecutor = (EsThreadPoolExecutor)executor; + final ThreadPool.Info info = info(threadPool, threadPoolName); + + assertThat(info.getName(), equalTo(threadPoolName)); + assertThat(info.getThreadPoolType(), equalTo(ThreadPool.ThreadPoolType.SCALING)); + + assertThat(info.getKeepAlive().seconds(), equalTo(keepAlive)); + assertThat(esThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS), equalTo(keepAlive)); + + assertNull(info.getQueueSize()); + assertThat(esThreadPoolExecutor.getQueue().remainingCapacity(), equalTo(Integer.MAX_VALUE)); + + assertThat(info.getMin(), equalTo(min)); + assertThat(esThreadPoolExecutor.getCorePoolSize(), equalTo(min)); + assertThat(info.getMax(), equalTo(expectedSize)); + assertThat(esThreadPoolExecutor.getMaximumPoolSize(), equalTo(expectedSize)); + }); + } + + @FunctionalInterface + private interface SizeFunction { + int size(int numberOfProcessors); + } + + private int expectedSize(final String threadPoolName, final int numberOfProcessors) { + final Map sizes = new HashMap<>(); + sizes.put(ThreadPool.Names.GENERIC, n -> ThreadPool.boundedBy(4 * n, 128, 512)); + sizes.put(ThreadPool.Names.MANAGEMENT, n -> 5); + sizes.put(ThreadPool.Names.FLUSH, ThreadPool::halfNumberOfProcessorsMaxFive); + sizes.put(ThreadPool.Names.REFRESH, ThreadPool::halfNumberOfProcessorsMaxTen); + sizes.put(ThreadPool.Names.WARMER, ThreadPool::halfNumberOfProcessorsMaxFive); + sizes.put(ThreadPool.Names.SNAPSHOT, ThreadPool::halfNumberOfProcessorsMaxFive); + sizes.put(ThreadPool.Names.FETCH_SHARD_STARTED, ThreadPool::twiceNumberOfProcessors); + sizes.put(ThreadPool.Names.FETCH_SHARD_STORE, ThreadPool::twiceNumberOfProcessors); + return sizes.get(threadPoolName).size(numberOfProcessors); + } + + public void testValidDynamicKeepAlive() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> { + final Executor beforeExecutor = threadPool.executor(threadPoolName); + final long seconds = randomIntBetween(1, 300); + clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".keep_alive", seconds + "s")); + final Executor afterExecutor = threadPool.executor(threadPoolName); + assertSame(beforeExecutor, afterExecutor); + final ThreadPool.Info info = info(threadPool, threadPoolName); + assertThat(info.getKeepAlive().seconds(), equalTo(seconds)); + }); + } + + public void testScalingThreadPoolIsBounded() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + final int size = randomIntBetween(32, 512); + final Settings settings = Settings.builder().put("threadpool." + threadPoolName + ".size", size).build(); + runScalingThreadPoolTest(settings, (clusterSettings, threadPool) -> { + final CountDownLatch latch = new CountDownLatch(1); + final int numberOfTasks = 2 * size; + final CountDownLatch taskLatch = new CountDownLatch(numberOfTasks); + for (int i = 0; i < numberOfTasks; i++) { + threadPool.executor(threadPoolName).execute(() -> { + try { + latch.await(); + taskLatch.countDown(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + final ThreadPoolStats.Stats stats = stats(threadPool, threadPoolName); + assertThat(stats.getQueue(), equalTo(numberOfTasks - size)); + assertThat(stats.getLargest(), equalTo(size)); + latch.countDown(); + try { + taskLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + public void testScalingThreadPoolThreadsAreTerminatedAfterKeepAlive() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + final Settings settings = + Settings.builder() + .put("threadpool." + threadPoolName + ".size", 128) + .put("threadpool." + threadPoolName + ".keep_alive", "1ms") + .build(); + runScalingThreadPoolTest(settings, ((clusterSettings, threadPool) -> { + final CountDownLatch latch = new CountDownLatch(1); + for (int i = 0; i < 128; i++) { + threadPool.executor(threadPoolName).execute(() -> { + try { + latch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + final int active = stats(threadPool, threadPoolName).getThreads(); + assertThat(active, equalTo(128)); + latch.countDown(); + do { + spinForAtLeastOneMillisecond(); + } while (stats(threadPool, threadPoolName).getThreads() > 4); + assertThat(stats(threadPool, threadPoolName).getCompleted(), equalTo(128L)); + })); + } + + public void testDynamicThreadPoolSize() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> { + final Executor beforeExecutor = threadPool.executor(threadPoolName); + final int size = randomIntBetween(128, 512); + clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".size", size)); + final Executor afterExecutor = threadPool.executor(threadPoolName); + assertSame(beforeExecutor, afterExecutor); + final ThreadPool.Info info = info(threadPool, threadPoolName); + int expectedMin = "generic".equals(threadPoolName) ? 4 : 1; + assertThat(info.getMin(), equalTo(expectedMin)); + assertThat(info.getMax(), equalTo(size)); + + assertThat(afterExecutor, instanceOf(EsThreadPoolExecutor.class)); + final EsThreadPoolExecutor executor = (EsThreadPoolExecutor)afterExecutor; + assertThat(executor.getCorePoolSize(), equalTo(expectedMin)); + assertThat(executor.getMaximumPoolSize(), equalTo(size)); + }); + } + + public void testResizingScalingThreadPoolQueue() throws InterruptedException { + final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); + runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> { + final int size = randomIntBetween(1, Integer.MAX_VALUE); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".queue_size", size))); + assertThat(e, hasToString( + "java.lang.IllegalArgumentException: thread pool [" + threadPoolName + "] can not have its queue re-sized but was [" + + size + "]")); + }); + } + + public void runScalingThreadPoolTest( + final Settings settings, + final BiConsumer consumer) throws InterruptedException { + ThreadPool threadPool = null; + try { + final String test = Thread.currentThread().getStackTrace()[2].getMethodName(); + final Settings nodeSettings = Settings.builder().put(settings).put("node.name", test).build(); + threadPool = new ThreadPool(nodeSettings); + final ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + threadPool.setClusterSettings(clusterSettings); + consumer.accept(clusterSettings, threadPool); + } finally { + terminateThreadPoolIfNeeded(threadPool); + } + } + + private static Settings settings(final String setting, final int value) { + return settings(setting, Integer.toString(value)); + } + + private static Settings settings(final String setting, final String value) { + return Settings.builder().put(setting, value).build(); + } + +} diff --git a/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java new file mode 100644 index 00000000000..daad1a51a08 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/threadpool/ThreadPoolTests.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.threadpool; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.CoreMatchers.equalTo; + +public class ThreadPoolTests extends ESTestCase { + + public void testBoundedByBelowMin() { + int min = randomIntBetween(0, 32); + int max = randomIntBetween(min + 1, 64); + int value = randomIntBetween(Integer.MIN_VALUE, min - 1); + assertThat(ThreadPool.boundedBy(value, min, max), equalTo(min)); + } + + public void testBoundedByAboveMax() { + int min = randomIntBetween(0, 32); + int max = randomIntBetween(min + 1, 64); + int value = randomIntBetween(max + 1, Integer.MAX_VALUE); + assertThat(ThreadPool.boundedBy(value, min, max), equalTo(max)); + } + + public void testBoundedByBetweenMinAndMax() { + int min = randomIntBetween(0, 32); + int max = randomIntBetween(min + 1, 64); + int value = randomIntBetween(min, max); + assertThat(ThreadPool.boundedBy(value, min, max), equalTo(value)); + } + +} diff --git a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index a3c4e46892d..43e8e7e7af5 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool.Names; import java.lang.reflect.Field; @@ -46,7 +45,7 @@ import static org.hamcrest.Matchers.sameInstance; /** */ -public class UpdateThreadPoolSettingsTests extends ESTestCase { +public class UpdateThreadPoolSettingsTests extends ESThreadPoolTestCase { public void testCorrectThreadPoolTypePermittedInSettings() throws InterruptedException { String threadPoolName = randomThreadPoolName(); @@ -162,56 +161,6 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase { } } - public void testCachedExecutorType() throws InterruptedException { - String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.CACHED); - ThreadPool threadPool = null; - try { - Settings nodeSettings = Settings.builder() - .put("node.name", "testCachedExecutorType").build(); - threadPool = new ThreadPool(nodeSettings); - ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - threadPool.setClusterSettings(clusterSettings); - - assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); - assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); - - Settings settings = clusterSettings.applySettings(Settings.builder() - .put("threadpool." + threadPoolName + ".keep_alive", "10m") - .build()); - assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); - assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(0)); - // Make sure keep alive value changed - assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(10L)); - - // Make sure keep alive value reused - assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(10L)); - assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); - - // Change keep alive - Executor oldExecutor = threadPool.executor(threadPoolName); - settings = clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".keep_alive", "1m").build()); - // Make sure keep alive value changed - assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); - // Make sure executor didn't change - assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); - assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); - - // Set the same keep alive - settings = clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".keep_alive", "1m").build()); - // Make sure keep alive value didn't change - assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(1L)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(1L)); - // Make sure executor didn't change - assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.CACHED); - assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); - } finally { - terminateThreadPoolIfNeeded(threadPool); - } - } - private static int getExpectedThreadPoolSize(Settings settings, String name, int size) { if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) { return Math.min(size, EsExecutors.boundedNumberOfProcessors(settings)); @@ -273,7 +222,7 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase { assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor)); // Change queue capacity - settings = clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".queue", "500") + clusterSettings.applySettings(Settings.builder().put(settings).put("threadpool." + threadPoolName + ".queue", "500") .build()); } finally { terminateThreadPoolIfNeeded(threadPool); @@ -290,9 +239,11 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase { threadPool = new ThreadPool(nodeSettings); ClusterSettings clusterSettings = new ClusterSettings(nodeSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); threadPool.setClusterSettings(clusterSettings); - assertThat(info(threadPool, threadPoolName).getMin(), equalTo(1)); + final int expectedMinimum = "generic".equals(threadPoolName) ? 4 : 1; + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedMinimum)); assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10)); - assertThat(info(threadPool, threadPoolName).getKeepAlive().minutes(), equalTo(5L)); + final long expectedKeepAlive = "generic".equals(threadPoolName) ? 30 : 300; + assertThat(info(threadPool, threadPoolName).getKeepAlive().seconds(), equalTo(expectedKeepAlive)); assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.SCALING); assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); @@ -358,6 +309,9 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase { try { Settings nodeSettings = Settings.builder() .put("threadpool.my_pool1.type", "scaling") + .put("threadpool.my_pool1.min", 1) + .put("threadpool.my_pool1.size", EsExecutors.boundedNumberOfProcessors(Settings.EMPTY)) + .put("threadpool.my_pool1.keep_alive", "1m") .put("threadpool.my_pool2.type", "fixed") .put("threadpool.my_pool2.size", "1") .put("threadpool.my_pool2.queue_size", "1") @@ -429,21 +383,6 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase { } } - private void terminateThreadPoolIfNeeded(ThreadPool threadPool) throws InterruptedException { - if (threadPool != null) { - terminate(threadPool); - } - } - - private ThreadPool.Info info(ThreadPool threadPool, String name) { - for (ThreadPool.Info info : threadPool.info()) { - if (info.getName().equals(name)) { - return info; - } - } - return null; - } - private String randomThreadPoolName() { Set threadPoolNames = ThreadPool.THREAD_POOL_TYPES.keySet(); return randomFrom(threadPoolNames.toArray(new String[threadPoolNames.size()])); @@ -456,7 +395,4 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase { return randomFrom(set.toArray(new ThreadPool.ThreadPoolType[set.size()])); } - private String randomThreadPool(ThreadPool.ThreadPoolType type) { - return randomFrom(ThreadPool.THREAD_POOL_TYPES.entrySet().stream().filter(t -> t.getValue().equals(type)).map(Map.Entry::getKey).collect(Collectors.toList())); - } } diff --git a/docs/reference/modules/threadpool.asciidoc b/docs/reference/modules/threadpool.asciidoc index 199b6a9b88c..fe9900b21f7 100644 --- a/docs/reference/modules/threadpool.asciidoc +++ b/docs/reference/modules/threadpool.asciidoc @@ -11,7 +11,7 @@ There are several thread pools, but the important ones include: `generic`:: For generic operations (e.g., background node discovery). - Thread pool type is `cached`. + Thread pool type is `scaling`. `index`:: For index/delete operations. Thread pool type is `fixed` @@ -72,26 +72,6 @@ NOTE: you can update thread pool settings dynamically using <> thread pool. - -The `keep_alive` parameter determines how long a thread should be kept -around in the thread pool without doing any work. - -[source,js] --------------------------------------------------- -threadpool: - generic: - keep_alive: 2m --------------------------------------------------- - [float] ==== `fixed` @@ -118,9 +98,9 @@ threadpool: [float] ==== `scaling` -The `scaling` thread pool holds a dynamic number of threads. This number is -proportional to the workload and varies between 1 and the value of the -`size` parameter. +The `scaling` thread pool holds a dynamic number of threads. This +number is proportional to the workload and varies between the value of +the `min` and `size` parameters. The `keep_alive` parameter determines how long a thread should be kept around in the thread pool without it doing any work. @@ -129,6 +109,7 @@ around in the thread pool without it doing any work. -------------------------------------------------- threadpool: warmer: + min: 1 size: 8 keep_alive: 2m -------------------------------------------------- diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 6a72d5cc1a9..c1ce8deffe2 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -314,7 +314,7 @@ public final class InternalTestCluster extends TestCluster { // always reduce this - it can make tests really slow builder.put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.getKey(), TimeValue.timeValueMillis(RandomInts.randomIntBetween(random, 20, 50))); defaultSettings = builder.build(); - executor = EsExecutors.newCached("test runner", 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY)); + executor = EsExecutors.newScaling("test runner", 0, Integer.MAX_VALUE, 0, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory("test_" + clusterName), new ThreadContext(Settings.EMPTY)); } public static String configuredNodeMode() { From d032de2df2305a8bc183dc685400d05cb7cacf63 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 24 Apr 2016 09:43:28 -0400 Subject: [PATCH 2/5] Cleanup o/e/c/u/c/EsExecutors.java This commit removes two unused imports and applies a few other formatting cleanups to EsExecutors.java. --- .../common/util/concurrent/EsExecutors.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index e06aad96d61..2d45a6fecff 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -25,18 +25,13 @@ import org.elasticsearch.common.settings.Settings; import java.util.Arrays; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -/** - * - */ public class EsExecutors { /** @@ -110,6 +105,7 @@ public class EsExecutors { } static class EsThreadFactory implements ThreadFactory { + final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; @@ -129,6 +125,7 @@ public class EsExecutors { t.setDaemon(true); return t; } + } /** @@ -137,7 +134,6 @@ public class EsExecutors { private EsExecutors() { } - static class ExecutorScalingQueue extends LinkedTransferQueue { ThreadPoolExecutor executor; @@ -166,6 +162,7 @@ public class EsExecutors { return true; } } + } /** @@ -188,4 +185,5 @@ public class EsExecutors { return 0; } } + } From e6a06b272e23d1abf07f2b9b8bc74fd195b94dd4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 24 Apr 2016 09:49:15 -0400 Subject: [PATCH 3/5] Clarify message on resize scaling executor queues This commit clarifies an error message that is produced when an attempt is made to resize the backing queue for a scaling executor. As this queue is unbounded, resizing the backing queue does not make sense. The clarification here is to specify that this restriction is because the executor is a scaling executor. --- .../src/main/java/org/elasticsearch/threadpool/ThreadPool.java | 2 +- .../org/elasticsearch/threadpool/ScalingThreadPoolTests.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 9e95299c159..1f5baec1040 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -525,7 +525,7 @@ public class ThreadPool extends AbstractComponent implements Closeable { int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); final Integer queueSize = settings.getAsInt("queue_size", defaultSettings.getAsInt("queue_size", null)); if (queueSize != null) { - throw new IllegalArgumentException("thread pool [" + name + "] can not have its queue re-sized but was [" + queueSize + "]"); + throw new IllegalArgumentException("thread pool [" + name + "] of type scaling can not have its queue re-sized but was [" + queueSize + "]"); } if (previousExecutorHolder != null) { if (ThreadPoolType.SCALING == previousInfo.getThreadPoolType()) { diff --git a/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java index e229b466ff7..a73ffdb2129 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -212,7 +212,8 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase { IllegalArgumentException.class, () -> clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".queue_size", size))); assertThat(e, hasToString( - "java.lang.IllegalArgumentException: thread pool [" + threadPoolName + "] can not have its queue re-sized but was [" + + "java.lang.IllegalArgumentException: thread pool [" + threadPoolName + + "] of type scaling can not have its queue re-sized but was [" + size + "]")); }); } From fd679a702165e45b51d6c7419a00fc1d02db4ee4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 24 Apr 2016 09:59:15 -0400 Subject: [PATCH 4/5] Test min pool size of zero for scaling executor This commit expands the configuration test for scaling executors to include the case where the min pool size is set to zero. --- .../org/elasticsearch/threadpool/ScalingThreadPoolTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java index a73ffdb2129..d956cc96abf 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -42,7 +42,7 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase { final int min; if (randomBoolean()) { - min = randomIntBetween(1, 8); + min = randomIntBetween(0, 8); builder.put("threadpool." + threadPoolName + ".min", min); } else { min = "generic".equals(threadPoolName) ? 4 : 1; // the defaults From b89a935be52a4b607f27fd95161ee492f7702fe3 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 26 Apr 2016 08:17:54 -0400 Subject: [PATCH 5/5] Expand scaling thread pool configuration coverage This commit slightly expands the scaling thread pool configuration test coverage. In particular, the test testScalingThreadPoolConfiguration is expanded to include the case when min is equal to size, and the test testDynamicThreadPoolSize is expanded to include all possible cases when size is greater than or equal to min. --- .../elasticsearch/threadpool/ScalingThreadPoolTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java b/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java index d956cc96abf..9c911d1e25c 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/ScalingThreadPoolTests.java @@ -59,7 +59,7 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase { final int expectedSize; if (sizeBasedOnNumberOfProcessors < min || randomBoolean()) { - expectedSize = randomIntBetween(min + 1, 16); + expectedSize = randomIntBetween(min, 16); builder.put("threadpool." + threadPoolName + ".size", expectedSize); } else { expectedSize = sizeBasedOnNumberOfProcessors; @@ -188,12 +188,12 @@ public class ScalingThreadPoolTests extends ESThreadPoolTestCase { final String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.SCALING); runScalingThreadPoolTest(Settings.EMPTY, (clusterSettings, threadPool) -> { final Executor beforeExecutor = threadPool.executor(threadPoolName); - final int size = randomIntBetween(128, 512); + int expectedMin = "generic".equals(threadPoolName) ? 4 : 1; + final int size = randomIntBetween(expectedMin, Integer.MAX_VALUE); clusterSettings.applySettings(settings("threadpool." + threadPoolName + ".size", size)); final Executor afterExecutor = threadPool.executor(threadPoolName); assertSame(beforeExecutor, afterExecutor); final ThreadPool.Info info = info(threadPool, threadPoolName); - int expectedMin = "generic".equals(threadPoolName) ? 4 : 1; assertThat(info.getMin(), equalTo(expectedMin)); assertThat(info.getMax(), equalTo(size));