Limit the number created threads for machines with large number of cores

For machines with lots of cores ie. >= 48 the number of threads
created by default might cause unecessary memory pressure on the system
and can even lead to OOM where the system is not able to create any
native threads anymore. This commit limits the number of available
CPUs on the system used for thread pool initialization to at most
24 cores.

Closes #3478
This commit is contained in:
Simon Willnauer 2013-08-15 00:04:44 +02:00
parent 28ae4d6393
commit 0472bac2ef
3 changed files with 18 additions and 6 deletions

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.http.*;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BindTransportException;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
@ -126,7 +127,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
this.resetCookies = componentSettings.getAsBoolean("reset_cookies", settings.getAsBoolean("http.reset_cookies", false));
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors() * 2);
this.workerCount = componentSettings.getAsInt("worker_count", ThreadPool.boundedNumberOfProcessors() * 2);
this.blockingServer = settings.getAsBoolean("http.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.port = componentSettings.get("port", settings.get("http.port", "9200-9300"));
this.bindHost = componentSettings.get("bind_host", settings.get("http.bind_host", settings.get("http.host")));

View File

@ -97,7 +97,7 @@ public class ThreadPool extends AbstractComponent {
Map<String, Settings> groupSettings = settings.getGroups(THREADPOOL_GROUP);
int availableProcessors = Runtime.getRuntime().availableProcessors();
int availableProcessors = boundedNumberOfProcessors();
int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5);
int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);
defaultExecutorTypeSettings = ImmutableMap.<String, Settings>builder()
@ -297,7 +297,7 @@ public class ThreadPool extends AbstractComponent {
threadFactory);
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) {
int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5);
int defaultSize = defaultSettings.getAsInt("size", boundedNumberOfProcessors() * 5);
SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null));
String defaultRejectSetting = defaultSettings.get("reject_policy", "abort");
String defaultQueueType = defaultSettings.get("queue_type", "linked");
@ -350,7 +350,7 @@ public class ThreadPool extends AbstractComponent {
} else if ("scaling".equals(type)) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1);
int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5);
int defaultSize = defaultSettings.getAsInt("size", boundedNumberOfProcessors() * 5);
if (previousExecutorHolder != null) {
if ("scaling".equals(previousInfo.getType())) {
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
@ -394,7 +394,7 @@ public class ThreadPool extends AbstractComponent {
} else if ("blocking".equals(type)) {
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
int defaultMin = defaultSettings.getAsInt("min", 1);
int defaultSize = defaultSettings.getAsInt("size", Runtime.getRuntime().availableProcessors() * 5);
int defaultSize = defaultSettings.getAsInt("size", boundedNumberOfProcessors() * 5);
SizeValue defaultQueueSize = defaultSettings.getAsSize("queue_size", new SizeValue(1000));
TimeValue defaultWaitTime = defaultSettings.getAsTime("wait_time", timeValueSeconds(60));
if (previousExecutorHolder != null) {
@ -806,6 +806,17 @@ public class ThreadPool extends AbstractComponent {
}
}
/**
* Returns the number of processors available but at most <tt>24</tt>.
*/
public static int boundedNumberOfProcessors() {
/* This relates to issues where machines with large number of cores
* ie. >= 48 create too many threads and run into OOM see #3478
* We just use an 24 core upper-bound here to not stress the system
* too much with too many created threads */
return Math.min(24, Runtime.getRuntime().availableProcessors());
}
class ApplySettings implements NodeSettingsService.Listener {
@Override

View File

@ -171,7 +171,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
connectMutex[i] = new Object();
}
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors() * 2);
this.workerCount = componentSettings.getAsInt("worker_count", ThreadPool.boundedNumberOfProcessors() * 2);
this.bossCount = componentSettings.getAsInt("boss_count", 1);
this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));