add logger.warn if thread pool size is clipped; fix test failure

This commit is contained in:
Michael McCandless 2016-01-05 10:59:40 -05:00 committed by mikemccand
parent 7ef43a22ff
commit 8e7719d136
2 changed files with 32 additions and 18 deletions

View File

@ -533,18 +533,19 @@ public class ThreadPool extends AbstractComponent {
throw new IllegalArgumentException("No type found [" + type + "], for [" + name + "]");
}
private int applyHardSizeLimit(String name, int requestedSize) {
private int applyHardSizeLimit(String name, int size) {
int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
if (name.equals(Names.BULK) || name.equals(Names.INDEX)) {
if ((name.equals(Names.BULK) || name.equals(Names.INDEX)) && size > availableProcessors) {
// We use a hard max size for the indexing pools, because if too many threads enter Lucene's IndexWriter, it means
// too many segments written, too frequently, too much merging, etc:
// TODO: I would love to be loud here (throw an exception if you ask for a too-big size), but I think this is dangerous
// because on upgrade this setting could be in cluster state and hard for the user to correct?
return Math.min(requestedSize, availableProcessors);
} else {
return requestedSize;
logger.warn("requested thread pool size [{}] for [{}] is too large; setting to maximum [{}] instead",
size, name, availableProcessors);
size = availableProcessors;
}
return size;
}
private void updateSettings(Settings settings) {

View File

@ -211,6 +211,14 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
}
}
private static int getExpectedThreadPoolSize(Settings settings, String name, int size) {
if (name.equals(ThreadPool.Names.BULK) || name.equals(ThreadPool.Names.INDEX)) {
return Math.min(size, EsExecutors.boundedNumberOfProcessors(settings));
} else {
return size;
}
}
public void testFixedExecutorType() throws InterruptedException {
String threadPoolName = randomThreadPool(ThreadPool.ThreadPoolType.FIXED);
ThreadPool threadPool = null;
@ -225,12 +233,14 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
Settings settings = clusterSettings.applySettings(settingsBuilder()
.put("threadpool." + threadPoolName + ".size", "15")
.build());
int expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 15);
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
// keep alive does not apply to fixed thread pools
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getKeepAliveTime(TimeUnit.MINUTES), equalTo(0L));
@ -240,20 +250,23 @@ public class UpdateThreadPoolSettingsTests extends ESTestCase {
// Make sure keep alive value is not used
assertThat(info(threadPool, threadPoolName).getKeepAlive(), nullValue());
// Make sure keep pool size value were reused
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(15));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
assertThat(threadPool.executor(threadPoolName), instanceOf(EsThreadPoolExecutor.class));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(15));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));
// Change size
Executor oldExecutor = threadPool.executor(threadPoolName);
settings = clusterSettings.applySettings(settingsBuilder().put(settings).put("threadpool." + threadPoolName + ".size", "10").build());
expectedSize = getExpectedThreadPoolSize(nodeSettings, threadPoolName, 10);
// Make sure size values changed
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(10));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(10));
assertThat(info(threadPool, threadPoolName).getMax(), equalTo(expectedSize));
assertThat(info(threadPool, threadPoolName).getMin(), equalTo(expectedSize));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getMaximumPoolSize(), equalTo(expectedSize));
assertThat(((EsThreadPoolExecutor) threadPool.executor(threadPoolName)).getCorePoolSize(), equalTo(expectedSize));
// Make sure executor didn't change
assertEquals(info(threadPool, threadPoolName).getThreadPoolType(), ThreadPool.ThreadPoolType.FIXED);
assertThat(threadPool.executor(threadPoolName), sameInstance(oldExecutor));