From 7761154e8341690a9eb622644ec6d7b094156158 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Mon, 27 Oct 2014 17:48:11 +0100 Subject: [PATCH] Core: Allow to configure custom thread pools Closes #8247 --- .../elasticsearch/threadpool/ThreadPool.java | 36 +++++++++- .../UpdateThreadPoolSettingsTests.java | 72 ++++++++++++++++++- 2 files changed, 104 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 62e5ccd0811..13450d80d1b 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -25,8 +25,6 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import org.apache.lucene.util.Counter; import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -38,7 +36,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.*; +import org.elasticsearch.common.util.concurrent.EsAbortPolicy; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; +import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; @@ -134,6 +135,15 @@ public class ThreadPool extends AbstractComponent { for (Map.Entry executor : defaultExecutorTypeSettings.entrySet()) { executors.put(executor.getKey(), build(executor.getKey(), groupSettings.get(executor.getKey()), executor.getValue())); } + + // Building custom thread pools + for (Map.Entry entry : groupSettings.entrySet()) { + if (executors.containsKey(entry.getKey())) { + continue; + } + executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), ImmutableSettings.EMPTY)); + } + executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.directExecutor(), new Info(Names.SAME, "same"))); if (!executors.get(Names.GENERIC).info.getType().equals("cached")) { throw new ElasticsearchIllegalArgumentException("generic thread pool must be of type cached"); @@ -423,6 +433,26 @@ public class ThreadPool extends AbstractComponent { } } } + + // Building custom thread pools + for (Map.Entry entry : groupSettings.entrySet()) { + if (defaultExecutorTypeSettings.containsKey(entry.getKey())) { + continue; + } + + ExecutorHolder oldExecutorHolder = executors.get(entry.getKey()); + ExecutorHolder newExecutorHolder = rebuild(entry.getKey(), oldExecutorHolder, entry.getValue(), ImmutableSettings.EMPTY); + // Can't introduce new thread pools at runtime, because The oldExecutorHolder variable will be null in the + // case the settings contains a thread pool not defined in the initial settings in the constructor. The if + // statement will then fail and so this prevents the addition of new thread groups at runtime, which is desired. + if (!newExecutorHolder.equals(oldExecutorHolder)) { + executors = newMapBuilder(executors).put(entry.getKey(), newExecutorHolder).immutableMap(); + if (!oldExecutorHolder.executor().equals(newExecutorHolder.executor()) && oldExecutorHolder.executor() instanceof EsThreadPoolExecutor) { + retiredExecutors.add(oldExecutorHolder); + ((EsThreadPoolExecutor) oldExecutorHolder.executor()).shutdown(new ExecutorShutdownListener(oldExecutorHolder)); + } + } + } } /** diff --git a/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java b/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java index 92ca587a5a6..443ebfbc28f 100644 --- a/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java +++ b/src/test/java/org/elasticsearch/threadpool/UpdateThreadPoolSettingsTests.java @@ -19,14 +19,15 @@ package org.elasticsearch.threadpool; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.threadpool.ThreadPool.Names; import org.junit.Test; +import java.lang.reflect.Field; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @@ -228,4 +229,73 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase { terminate(threadPool); } + @Test + public void testCustomThreadPool() throws Exception { + ThreadPool threadPool = new ThreadPool(ImmutableSettings.settingsBuilder() + .put("threadpool.my_pool1.type", "cached") + .put("threadpool.my_pool2.type", "fixed") + .put("threadpool.my_pool2.size", "1") + .put("threadpool.my_pool2.queue_size", "1") + .put("name", "testCustomThreadPool").build(), null); + + ThreadPoolInfo groups = threadPool.info(); + boolean foundPool1 = false; + boolean foundPool2 = false; + outer: for (ThreadPool.Info info : groups) { + if ("my_pool1".equals(info.getName())) { + foundPool1 = true; + assertThat(info.getType(), equalTo("cached")); + } else if ("my_pool2".equals(info.getName())) { + foundPool2 = true; + assertThat(info.getType(), equalTo("fixed")); + assertThat(info.getMin(), equalTo(1)); + assertThat(info.getMax(), equalTo(1)); + assertThat(info.getQueueSize().singles(), equalTo(1l)); + } else { + for (Field field : Names.class.getFields()) { + if (info.getName().equalsIgnoreCase(field.getName())) { + // This is ok it is a default thread pool + continue outer; + } + } + fail("Unexpected pool name: " + info.getName()); + } + } + assertThat(foundPool1, is(true)); + assertThat(foundPool2, is(true)); + + // Updating my_pool2 + Settings settings = ImmutableSettings.builder() + .put("threadpool.my_pool2.size", "10") + .build(); + threadPool.updateSettings(settings); + + groups = threadPool.info(); + foundPool1 = false; + foundPool2 = false; + outer: for (ThreadPool.Info info : groups) { + if ("my_pool1".equals(info.getName())) { + foundPool1 = true; + assertThat(info.getType(), equalTo("cached")); + } else if ("my_pool2".equals(info.getName())) { + foundPool2 = true; + assertThat(info.getMax(), equalTo(10)); + assertThat(info.getMin(), equalTo(10)); + assertThat(info.getQueueSize().singles(), equalTo(1l)); + assertThat(info.getType(), equalTo("fixed")); + } else { + for (Field field : Names.class.getFields()) { + if (info.getName().equalsIgnoreCase(field.getName())) { + // This is ok it is a default thread pool + continue outer; + } + } + fail("Unexpected pool name: " + info.getName()); + } + } + assertThat(foundPool1, is(true)); + assertThat(foundPool2, is(true)); + terminate(threadPool); + } + }