diff --git a/modules/transport-netty4/build.gradle b/modules/transport-netty4/build.gradle index 6bc64a178d9..c35dbe8ff29 100644 --- a/modules/transport-netty4/build.gradle +++ b/modules/transport-netty4/build.gradle @@ -47,6 +47,22 @@ dependencyLicenses { mapping from: /netty-.*/, to: 'netty' } +test { + /* + * We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each + * other if we allow them to set the number of available processors as it's set-once in Netty. + */ + systemProperty 'es.set.netty.runtime.available.processors', 'false' +} + +integTestRunner { + /* + * We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each + * other if we allow them to set the number of available processors as it's set-once in Netty. + */ + systemProperty 'es.set.netty.runtime.available.processors', 'false' +} + thirdPartyAudit.excludes = [ // classes are missing diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index 267a0d3dbfd..769ef34a789 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -209,6 +209,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) { super(settings); + Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings)); this.networkService = networkService; this.bigArrays = bigArrays; this.threadPool = threadPool; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index 4b515aab869..8adeb665e04 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -134,6 +134,7 @@ public class Netty4Transport extends TcpTransport { public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService); + Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings)); this.workerCount = WORKER_COUNT.get(settings); this.maxCumulationBufferCapacity = NETTY_MAX_CUMULATION_BUFFER_CAPACITY.get(settings); this.maxCompositeBufferComponents = NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java index 4d28bf9a257..d71e1ee9376 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java @@ -24,10 +24,12 @@ import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; +import io.netty.util.NettyRuntime; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.common.Booleans; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.ESLoggerFactory; @@ -36,6 +38,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class Netty4Utils { @@ -55,6 +59,41 @@ public class Netty4Utils { } + private static AtomicBoolean isAvailableProcessorsSet = new AtomicBoolean(); + + /** + * Set the number of available processors that Netty uses for sizing various resources (e.g., thread pools). + * + * @param availableProcessors the number of available processors + * @throws IllegalStateException if available processors was set previously and the specified value does not match the already-set value + */ + public static void setAvailableProcessors(final int availableProcessors) { + // we set this to false in tests to avoid tests that randomly set processors from stepping on each other + final boolean set = Booleans.parseBoolean(System.getProperty("es.set.netty.runtime.available.processors", "true")); + if (!set) { + return; + } + + /* + * This can be invoked twice, once from Netty4Transport and another time from Netty4HttpServerTransport; however, + * Netty4Runtime#availableProcessors forbids settings the number of processors twice so we prevent double invocation here. + */ + if (isAvailableProcessorsSet.compareAndSet(false, true)) { + NettyRuntime.setAvailableProcessors(availableProcessors); + } else if (availableProcessors != NettyRuntime.availableProcessors()) { + /* + * We have previously set the available processors yet either we are trying to set it to a different value now or there is a bug + * in Netty and our previous value did not take, bail. + */ + final String message = String.format( + Locale.ROOT, + "available processors value [%d] did not match current value [%d]", + availableProcessors, + NettyRuntime.availableProcessors()); + throw new IllegalStateException(message); + } + } + /** * Turns the given BytesReference into a ByteBuf. Note: the returned ByteBuf will reference the internal * pages of the BytesReference. Don't free the bytes of reference before the ByteBuf goes out of scope. diff --git a/qa/smoke-test-http/build.gradle b/qa/smoke-test-http/build.gradle index d15a4b5d900..03440912d04 100644 --- a/qa/smoke-test-http/build.gradle +++ b/qa/smoke-test-http/build.gradle @@ -24,3 +24,11 @@ apply plugin: 'elasticsearch.test-with-dependencies' dependencies { testCompile project(path: ':modules:transport-netty4', configuration: 'runtime') // for http } + +integTestRunner { + /* + * We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each + * other if we allow them to set the number of available processors as it's set-once in Netty. + */ + systemProperty 'es.set.netty.runtime.available.processors', 'false' +}