Set available processors for Netty
Netty uses the number of processors for sizing various resources (e.g., thread pools, buffer pools, etc.). However, it uses the runtime number of available processors which might not match the configured number of processors as set in Elasticsearch to limit the number of threads (for example, in Docker containers). A new feature was added to Netty that enables configuring the number of processors Netty should see for sizing this various resources. This commit takes advantage of this feature to set this number of available processors to be equal to the configured number of processors set in Elasticsearch. Relates #24420
This commit is contained in:
parent
e88d54bf0a
commit
40ff169c54
|
@ -47,6 +47,22 @@ dependencyLicenses {
|
|||
mapping from: /netty-.*/, to: 'netty'
|
||||
}
|
||||
|
||||
test {
|
||||
/*
|
||||
* We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each
|
||||
* other if we allow them to set the number of available processors as it's set-once in Netty.
|
||||
*/
|
||||
systemProperty 'es.set.netty.runtime.available.processors', 'false'
|
||||
}
|
||||
|
||||
integTestRunner {
|
||||
/*
|
||||
* We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each
|
||||
* other if we allow them to set the number of available processors as it's set-once in Netty.
|
||||
*/
|
||||
systemProperty 'es.set.netty.runtime.available.processors', 'false'
|
||||
}
|
||||
|
||||
thirdPartyAudit.excludes = [
|
||||
// classes are missing
|
||||
|
||||
|
|
|
@ -209,6 +209,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
|
|||
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
|
||||
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
|
||||
super(settings);
|
||||
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
|
||||
this.networkService = networkService;
|
||||
this.bigArrays = bigArrays;
|
||||
this.threadPool = threadPool;
|
||||
|
|
|
@ -134,6 +134,7 @@ public class Netty4Transport extends TcpTransport<Channel> {
|
|||
public Netty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
super("netty", settings, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
|
||||
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
|
||||
this.workerCount = WORKER_COUNT.get(settings);
|
||||
this.maxCumulationBufferCapacity = NETTY_MAX_CUMULATION_BUFFER_CAPACITY.get(settings);
|
||||
this.maxCompositeBufferComponents = NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
|
||||
|
|
|
@ -24,10 +24,12 @@ import io.netty.buffer.CompositeByteBuf;
|
|||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.util.NettyRuntime;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
|
||||
|
@ -36,6 +38,8 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Netty4Utils {
|
||||
|
@ -55,6 +59,41 @@ public class Netty4Utils {
|
|||
|
||||
}
|
||||
|
||||
private static AtomicBoolean isAvailableProcessorsSet = new AtomicBoolean();
|
||||
|
||||
/**
|
||||
* Set the number of available processors that Netty uses for sizing various resources (e.g., thread pools).
|
||||
*
|
||||
* @param availableProcessors the number of available processors
|
||||
* @throws IllegalStateException if available processors was set previously and the specified value does not match the already-set value
|
||||
*/
|
||||
public static void setAvailableProcessors(final int availableProcessors) {
|
||||
// we set this to false in tests to avoid tests that randomly set processors from stepping on each other
|
||||
final boolean set = Booleans.parseBoolean(System.getProperty("es.set.netty.runtime.available.processors", "true"));
|
||||
if (!set) {
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* This can be invoked twice, once from Netty4Transport and another time from Netty4HttpServerTransport; however,
|
||||
* Netty4Runtime#availableProcessors forbids settings the number of processors twice so we prevent double invocation here.
|
||||
*/
|
||||
if (isAvailableProcessorsSet.compareAndSet(false, true)) {
|
||||
NettyRuntime.setAvailableProcessors(availableProcessors);
|
||||
} else if (availableProcessors != NettyRuntime.availableProcessors()) {
|
||||
/*
|
||||
* We have previously set the available processors yet either we are trying to set it to a different value now or there is a bug
|
||||
* in Netty and our previous value did not take, bail.
|
||||
*/
|
||||
final String message = String.format(
|
||||
Locale.ROOT,
|
||||
"available processors value [%d] did not match current value [%d]",
|
||||
availableProcessors,
|
||||
NettyRuntime.availableProcessors());
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Turns the given BytesReference into a ByteBuf. Note: the returned ByteBuf will reference the internal
|
||||
* pages of the BytesReference. Don't free the bytes of reference before the ByteBuf goes out of scope.
|
||||
|
|
|
@ -24,3 +24,11 @@ apply plugin: 'elasticsearch.test-with-dependencies'
|
|||
dependencies {
|
||||
testCompile project(path: ':modules:transport-netty4', configuration: 'runtime') // for http
|
||||
}
|
||||
|
||||
integTestRunner {
|
||||
/*
|
||||
* We have to disable setting the number of available processors as tests in the same JVM randomize processors and will step on each
|
||||
* other if we allow them to set the number of available processors as it's set-once in Netty.
|
||||
*/
|
||||
systemProperty 'es.set.netty.runtime.available.processors', 'false'
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue