diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 80c31d7db3d..ee13a75dc8a 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -33,12 +33,12 @@ public class EsExecutors { /** * Returns the number of processors available but at most 32. */ - public static int boundedNumberOfProcessors() { + public static int boundedNumberOfProcessors(Settings settings) { /* This relates to issues where machines with large number of cores * ie. >= 48 create too many threads and run into OOM see #3478 * We just use an 32 core upper-bound here to not stress the system * too much with too many created threads */ - return Math.min(32, Runtime.getRuntime().availableProcessors()); + return settings.getAsInt("processors", Math.min(32, Runtime.getRuntime().availableProcessors())); } public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(ThreadFactory threadFactory) { diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index 9146467fabb..27af829b1ea 100644 --- a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -127,7 +127,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent groupSettings = settings.getGroups(THREADPOOL_GROUP); - int availableProcessors = EsExecutors.boundedNumberOfProcessors(); + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); int halfProcMaxAt5 = Math.min(((availableProcessors + 1) / 2), 5); int halfProcMaxAt10 = Math.min(((availableProcessors + 1) / 2), 10); defaultExecutorTypeSettings = ImmutableMap.builder() @@ -296,7 +296,7 @@ public class ThreadPool extends AbstractComponent { Executor executor = EsExecutors.newCached(keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory); return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); } else if ("fixed".equals(type)) { - int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors()); + int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null)); if (previousExecutorHolder != null) { @@ -327,7 +327,7 @@ public class ThreadPool extends AbstractComponent { } else if ("scaling".equals(type)) { TimeValue defaultKeepAlive = defaultSettings.getAsTime("keep_alive", timeValueMinutes(5)); int defaultMin = defaultSettings.getAsInt("min", 1); - int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors()); + int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); if (previousExecutorHolder != null) { if ("scaling".equals(previousInfo.getType())) { TimeValue updatedKeepAlive = settings.getAsTime("keep_alive", previousInfo.getKeepAlive()); diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index f641436b6ad..dc7e3ac9285 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -152,8 +152,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem private volatile BoundTransportAddress boundAddress; - private final KeyedLock connectionLock = new KeyedLock(); - + private final KeyedLock connectionLock = new KeyedLock(); + // this lock is here to make sure we close this transport and disconnect all the client nodes // connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?) private final ReadWriteLock globalLock = new ReentrantReadWriteLock(); @@ -169,7 +169,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem System.setProperty("org.jboss.netty.epollBugWorkaround", "true"); } - this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors() * 2); + this.workerCount = componentSettings.getAsInt("worker_count", EsExecutors.boundedNumberOfProcessors(settings) * 2); this.bossCount = componentSettings.getAsInt("boss_count", 1); this.blockingServer = settings.getAsBoolean("transport.tcp.blocking_server", settings.getAsBoolean(TCP_BLOCKING_SERVER, settings.getAsBoolean(TCP_BLOCKING, false))); this.blockingClient = settings.getAsBoolean("transport.tcp.blocking_client", settings.getAsBoolean(TCP_BLOCKING_CLIENT, settings.getAsBoolean(TCP_BLOCKING, false))); @@ -591,7 +591,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels != null) { return; - } + } connectionLock.acquire(node.id()); try { if (!lifecycle.started()) { @@ -755,12 +755,12 @@ public class NettyTransport extends AbstractLifecycleComponent implem if (nodeChannels != null) { connectionLock.acquire(node.id()); try { - try { - nodeChannels.close(); - } finally { - logger.debug("disconnected from [{}]", node); - transportServiceAdapter.raiseNodeDisconnected(node); - } + try { + nodeChannels.close(); + } finally { + logger.debug("disconnected from [{}]", node); + transportServiceAdapter.raiseNodeDisconnected(node); + } } finally { connectionLock.release(node.id()); } @@ -774,7 +774,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem NodeChannels nodeChannels = connectedNodes.get(node); if (nodeChannels != null && nodeChannels.hasChannel(channel)) { connectionLock.acquire(node.id()); - if (!nodeChannels.hasChannel(channel)){ //might have been removed in the meanwhile, safety check + if (!nodeChannels.hasChannel(channel)) { //might have been removed in the meanwhile, safety check assert !connectedNodes.containsKey(node); } else { try {