Core: Allow to configure custom thread pools

Closes #8247
This commit is contained in:
Martijn van Groningen 2014-10-27 17:48:11 +01:00
parent 2ebf34b93e
commit 7761154e83
2 changed files with 104 additions and 4 deletions

View File

@ -25,8 +25,6 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import org.apache.lucene.util.Counter; import org.apache.lucene.util.Counter;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -38,7 +36,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.*; import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
@ -134,6 +135,15 @@ public class ThreadPool extends AbstractComponent {
for (Map.Entry<String, Settings> executor : defaultExecutorTypeSettings.entrySet()) { for (Map.Entry<String, Settings> executor : defaultExecutorTypeSettings.entrySet()) {
executors.put(executor.getKey(), build(executor.getKey(), groupSettings.get(executor.getKey()), executor.getValue())); executors.put(executor.getKey(), build(executor.getKey(), groupSettings.get(executor.getKey()), executor.getValue()));
} }
// Building custom thread pools
for (Map.Entry<String, Settings> entry : groupSettings.entrySet()) {
if (executors.containsKey(entry.getKey())) {
continue;
}
executors.put(entry.getKey(), build(entry.getKey(), entry.getValue(), ImmutableSettings.EMPTY));
}
executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.directExecutor(), new Info(Names.SAME, "same"))); executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.directExecutor(), new Info(Names.SAME, "same")));
if (!executors.get(Names.GENERIC).info.getType().equals("cached")) { if (!executors.get(Names.GENERIC).info.getType().equals("cached")) {
throw new ElasticsearchIllegalArgumentException("generic thread pool must be of type cached"); throw new ElasticsearchIllegalArgumentException("generic thread pool must be of type cached");
@ -423,6 +433,26 @@ public class ThreadPool extends AbstractComponent {
} }
} }
} }
// Building custom thread pools
for (Map.Entry<String, Settings> entry : groupSettings.entrySet()) {
if (defaultExecutorTypeSettings.containsKey(entry.getKey())) {
continue;
}
ExecutorHolder oldExecutorHolder = executors.get(entry.getKey());
ExecutorHolder newExecutorHolder = rebuild(entry.getKey(), oldExecutorHolder, entry.getValue(), ImmutableSettings.EMPTY);
// Can't introduce new thread pools at runtime, because The oldExecutorHolder variable will be null in the
// case the settings contains a thread pool not defined in the initial settings in the constructor. The if
// statement will then fail and so this prevents the addition of new thread groups at runtime, which is desired.
if (!newExecutorHolder.equals(oldExecutorHolder)) {
executors = newMapBuilder(executors).put(entry.getKey(), newExecutorHolder).immutableMap();
if (!oldExecutorHolder.executor().equals(newExecutorHolder.executor()) && oldExecutorHolder.executor() instanceof EsThreadPoolExecutor) {
retiredExecutors.add(oldExecutorHolder);
((EsThreadPoolExecutor) oldExecutorHolder.executor()).shutdown(new ExecutorShutdownListener(oldExecutorHolder));
}
}
}
} }
/** /**

View File

@ -19,14 +19,15 @@
package org.elasticsearch.threadpool; package org.elasticsearch.threadpool;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.test.ElasticsearchTestCase; import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.threadpool.ThreadPool.Names;
import org.junit.Test; import org.junit.Test;
import java.lang.reflect.Field;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -228,4 +229,73 @@ public class UpdateThreadPoolSettingsTests extends ElasticsearchTestCase {
terminate(threadPool); terminate(threadPool);
} }
@Test
public void testCustomThreadPool() throws Exception {
ThreadPool threadPool = new ThreadPool(ImmutableSettings.settingsBuilder()
.put("threadpool.my_pool1.type", "cached")
.put("threadpool.my_pool2.type", "fixed")
.put("threadpool.my_pool2.size", "1")
.put("threadpool.my_pool2.queue_size", "1")
.put("name", "testCustomThreadPool").build(), null);
ThreadPoolInfo groups = threadPool.info();
boolean foundPool1 = false;
boolean foundPool2 = false;
outer: for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertThat(info.getType(), equalTo("cached"));
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getType(), equalTo("fixed"));
assertThat(info.getMin(), equalTo(1));
assertThat(info.getMax(), equalTo(1));
assertThat(info.getQueueSize().singles(), equalTo(1l));
} else {
for (Field field : Names.class.getFields()) {
if (info.getName().equalsIgnoreCase(field.getName())) {
// This is ok it is a default thread pool
continue outer;
}
}
fail("Unexpected pool name: " + info.getName());
}
}
assertThat(foundPool1, is(true));
assertThat(foundPool2, is(true));
// Updating my_pool2
Settings settings = ImmutableSettings.builder()
.put("threadpool.my_pool2.size", "10")
.build();
threadPool.updateSettings(settings);
groups = threadPool.info();
foundPool1 = false;
foundPool2 = false;
outer: for (ThreadPool.Info info : groups) {
if ("my_pool1".equals(info.getName())) {
foundPool1 = true;
assertThat(info.getType(), equalTo("cached"));
} else if ("my_pool2".equals(info.getName())) {
foundPool2 = true;
assertThat(info.getMax(), equalTo(10));
assertThat(info.getMin(), equalTo(10));
assertThat(info.getQueueSize().singles(), equalTo(1l));
assertThat(info.getType(), equalTo("fixed"));
} else {
for (Field field : Names.class.getFields()) {
if (info.getName().equalsIgnoreCase(field.getName())) {
// This is ok it is a default thread pool
continue outer;
}
}
fail("Unexpected pool name: " + info.getName());
}
}
assertThat(foundPool1, is(true));
assertThat(foundPool2, is(true));
terminate(threadPool);
}
} }