Bound processor size based cals to 32
We use number of processors to choose default thread pool sizes, and number of workers in networking (for HTTP and transport). Bound it to max at 32 by default as a safety measure to create too many threads. This relates to #3478, where we set the default to 24, but 32 is probably a better default. closes #3545
This commit is contained in:
parent
9af7a850e9
commit
d442e089ac
|
@ -30,6 +30,17 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
*/
|
||||
public class EsExecutors {
|
||||
|
||||
/**
|
||||
* Returns the number of processors available but at most <tt>32</tt>.
|
||||
*/
|
||||
public static int boundedNumberOfProcessors() {
|
||||
/* This relates to issues where machines with large number of cores
|
||||
* ie. >= 48 create too many threads and run into OOM see #3478
|
||||
* We just use an 32 core upper-bound here to not stress the system
|
||||
* too much with too many created threads */
|
||||
return Math.min(32, Runtime.getRuntime().availableProcessors());
|
||||
}
|
||||
|
||||
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(ThreadFactory threadFactory) {
|
||||
return new PrioritizedEsThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory);
|
||||
}
|
||||
|
|
|
@ -33,10 +33,10 @@ import org.elasticsearch.common.transport.NetworkExceptionHelper;
|
|||
import org.elasticsearch.common.transport.PortsRange;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.http.*;
|
||||
import org.elasticsearch.http.HttpRequest;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap;
|
||||
import org.jboss.netty.channel.*;
|
||||
|
@ -127,7 +127,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
this.resetCookies = componentSettings.getAsBoolean("reset_cookies", settings.getAsBoolean("http.reset_cookies", false));
|
||||
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
|
||||
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
|
||||
this.workerCount = componentSettings.getAsInt("worker_count", ThreadPool.boundedNumberOfProcessors() * 2);
|
||||
this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors() * 2);
|
||||
this.blockingServer = settings.getAsBoolean("http.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
|
||||
this.port = componentSettings.get("port", settings.get("http.port", "9200-9300"));
|
||||
this.bindHost = componentSettings.get("bind_host", settings.get("http.bind_host", settings.get("http.host")));
|
||||
|
|
|
@ -99,7 +99,7 @@ public class ThreadPool extends AbstractComponent {
|
|||
|
||||
Map<String, Settings> groupSettings = settings.getGroups(THREADPOOL_GROUP);
|
||||
|
||||
int availableProcessors = boundedNumberOfProcessors();
|
||||
int availableProcessors = EsExecutors.boundedNumberOfProcessors();
|
||||
int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5);
|
||||
int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10);
|
||||
defaultExecutorTypeSettings = ImmutableMap.<String, Settings>builder()
|
||||
|
@ -296,7 +296,7 @@ public class ThreadPool extends AbstractComponent {
|
|||
Executor executor = EsExecutors.newCached(keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory);
|
||||
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
|
||||
} else if ("fixed".equals(type)) {
|
||||
int defaultSize = defaultSettings.getAsInt("size", boundedNumberOfProcessors() * 5);
|
||||
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors());
|
||||
SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null));
|
||||
String defaultRejectSetting = defaultSettings.get("reject_policy", "abort");
|
||||
String defaultQueueType = defaultSettings.get("queue_type", "linked");
|
||||
|
@ -335,7 +335,7 @@ public class ThreadPool extends AbstractComponent {
|
|||
} else if ("scaling".equals(type)) {
|
||||
TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5));
|
||||
int defaultMin = defaultSettings.getAsInt("min", 1);
|
||||
int defaultSize = defaultSettings.getAsInt("size", boundedNumberOfProcessors() * 5);
|
||||
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors());
|
||||
if (previousExecutorHolder != null) {
|
||||
if ("scaling".equals(previousInfo.getType())) {
|
||||
TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive());
|
||||
|
@ -690,17 +690,6 @@ public class ThreadPool extends AbstractComponent {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of processors available but at most <tt>24</tt>.
|
||||
*/
|
||||
public static int boundedNumberOfProcessors() {
|
||||
/* This relates to issues where machines with large number of cores
|
||||
* ie. >= 48 create too many threads and run into OOM see #3478
|
||||
* We just use an 24 core upper-bound here to not stress the system
|
||||
* too much with too many created threads */
|
||||
return Math.min(24, Runtime.getRuntime().availableProcessors());
|
||||
}
|
||||
|
||||
class ApplySettings implements NodeSettingsService.Listener {
|
||||
@Override
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.elasticsearch.common.transport.PortsRange;
|
|||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
|
@ -171,7 +172,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
connectMutex[i] = new Object();
|
||||
}
|
||||
|
||||
this.workerCount = componentSettings.getAsInt("worker_count", ThreadPool.boundedNumberOfProcessors() * 2);
|
||||
this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors() * 2);
|
||||
this.bossCount = componentSettings.getAsInt("boss_count", 1);
|
||||
this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false)));
|
||||
this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false)));
|
||||
|
|
Loading…
Reference in New Issue