From 8e7719d1364e3ca2df5bac95a9e1887f827b83ff Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Tue, 5 Jan 2016 10:59:40 -0500 Subject: [PATCH] add logger.warn if thread pool size is clipped; fix test failure --- .../elasticsearch/threadpool/ThreadPool.java | 13 ++++--- .../UpdateThreadPoolSettingsTests.java | 37 +++++++++++++------ 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 7e1ace21d1a..9b8b9331b9e 100644 --- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -533,18 +533,19 @@ public class ThreadPool extends AbstractComponent { throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]"); } - private int applyHardSizeLimit(String name, int requestedSize) { + private int applyHardSizeLimit(String name, int size) { int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); - if (name.equals(Names.BULK) || name.equals(Names.INDEX)) { + if ((name.equals(Names.BULK) || name.equals(Names.INDEX)) && size > availableProcessors) { // We use a hard max size for the indexing pools, because if too many threads enter Lucene's IndexWriter, it means // too many segments written, too frequently, too much merging, etc: - // TODO: I would love to be loud here (throw an exception if you ask for a too-big size), but I think this is dangerous // because on upgrade this setting could be in cluster state and hard for the user to correct? - return Math.min(requestedSize, availableProcessors); - } else { - return requestedSize; + logger.warn("requested thread pool size [{}] for [{}] is too large; setting to maximum [{}] instead", + size, name, availableProcessors); + size = availableProcessors; } + + return size; } private void updateSettings(Settings settings) { diff --git a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index a9f22da864a..09653c12e07 100644 --- a/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/core/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -211,6 +211,14 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase { } } + private static int getExpectedThreadPoolSize(Settings settings, String name, int size) { + if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) { + return Math.min(size, EsExecutors.boundedNumberOfProcessors(settings)); + } else { + return size; + } + } + public void testFixedExecutorType() throws InterruptedException { String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED); ThreadPool threadPool = null; @@ -225,12 +233,14 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase { Settings settings = clusterSettings.applySettings(settingsBuilder() .put("threadpool." + threadPoolName + ".size", "15") .build()); + + int expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 15); assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15)); - assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15)); - assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize)); // keep alive does not apply to fixed thread pools assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L)); @@ -240,20 +250,23 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase { // Make sure keep alive value is not used assertThat(info(threadPool, threadPoolName).getKeepAlive(), nullValue()); // Make sure keep pool size value were reused - assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15)); - assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize)); assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize)); // Change size Executor oldExecutor = threadPool.executor(threadPoolName); settings = clusterSettings.applySettings(settingsBuilder().put(settings).put("threadpool." + threadPoolName + ".size", "10").build()); + + expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 10); + // Make sure size values changed - assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10)); - assertThat(info(threadPool, threadPoolName).getMin(), equalTo(10)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(10)); - assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(10)); + assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize)); + assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize)); + assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize)); // Make sure executor didn't change assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED); assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));