Remove option to enable direct buffer pooling (#48310)

This commit removes the option to change the netty system properties to
reenable the direct buffer pooling. It also removes the need for us to
disable the buffer pooling in the system properties file. Instead, we
programmatically craete an allocator that is used by our networking
layer.

This commit does introduce an Elasticsearch property which allows the
user to fallback on the netty default allocator. If they choose this
option, they can configure the default allocator how they wish using the
standard netty properties.
This commit is contained in:
Tim Brooks 2019-10-21 19:15:50 -06:00 committed by GitHub
parent 0d12ef8958
commit 547e399dbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 259 additions and 65 deletions

View File

@ -907,7 +907,6 @@ class BuildPlugin implements Plugin<Project> {
test.systemProperty('io.netty.noUnsafe', 'true')
test.systemProperty('io.netty.noKeySetOptimization', 'true')
test.systemProperty('io.netty.recycler.maxCapacityPerThread', '0')
test.systemProperty('io.netty.allocator.numDirectArenas', '0')
test.testLogging { TestLoggingContainer logging ->
logging.showExceptions = true

View File

@ -82,7 +82,6 @@
-Dio.netty.noUnsafe=true
-Dio.netty.noKeySetOptimization=true
-Dio.netty.recycler.maxCapacityPerThread=0
-Dio.netty.allocator.numDirectArenas=0
# log4j 2
-Dlog4j.shutdownHookEnabled=false

View File

@ -56,14 +56,6 @@ final class JvmErgonomics {
final List<String> ergonomicChoices = new ArrayList<>();
final Map<String, Optional<String>> finalJvmOptions = finalJvmOptions(userDefinedJvmOptions);
final long heapSize = extractHeapSize(finalJvmOptions);
final Map<String, String> systemProperties = extractSystemProperties(userDefinedJvmOptions);
if (systemProperties.containsKey("io.netty.allocator.type") == false) {
if (heapSize <= 1 << 30) {
ergonomicChoices.add("-Dio.netty.allocator.type=unpooled");
} else {
ergonomicChoices.add("-Dio.netty.allocator.type=pooled");
}
}
final long maxDirectMemorySize = extractMaxDirectMemorySize(finalJvmOptions);
if (maxDirectMemorySize == 0) {
ergonomicChoices.add("-XX:MaxDirectMemorySize=" + heapSize / 2);

View File

@ -123,22 +123,7 @@ public class JvmErgonomicsTests extends LaunchersTestCase {
Map<String, String> parsedSystemProperties = JvmErgonomics.extractSystemProperties(Arrays.asList("-Xms1024M", "-Xmx1024M"));
assertTrue(parsedSystemProperties.isEmpty());
}
public void testPooledMemoryChoiceOnSmallHeap() throws InterruptedException, IOException {
final String smallHeap = randomFrom(Arrays.asList("64M", "512M", "1024M", "1G"));
assertThat(
JvmErgonomics.choose(Arrays.asList("-Xms" + smallHeap, "-Xmx" + smallHeap)),
hasItem("-Dio.netty.allocator.type=unpooled"));
}
public void testPooledMemoryChoiceOnNotSmallHeap() throws InterruptedException, IOException {
assumeFalse(System.getProperty("os.name").startsWith("Windows") && JavaVersion.majorVersion(JavaVersion.CURRENT) == 8);
final String largeHeap = randomFrom(Arrays.asList("1025M", "2048M", "2G", "8G"));
assertThat(
JvmErgonomics.choose(Arrays.asList("-Xms" + largeHeap, "-Xmx" + largeHeap)),
hasItem("-Dio.netty.allocator.type=pooled"));
}
public void testMaxDirectMemorySizeChoice() throws InterruptedException, IOException {
assumeFalse(System.getProperty("os.name").startsWith("Windows") && JavaVersion.majorVersion(JavaVersion.CURRENT) == 8);
final Map<String, String> heapMaxDirectMemorySize = new HashMap<>();

View File

@ -67,7 +67,7 @@ integTestRunner {
TaskProvider<Test> pooledTest = tasks.register("pooledTest", Test) {
include '**/*Tests.class'
systemProperty 'es.set.netty.runtime.available.processors', 'false'
systemProperty 'io.netty.allocator.type', 'pooled'
systemProperty 'es.use_unpooled_allocator', 'false'
}
// TODO: we can't use task avoidance here because RestIntegTestTask does the testcluster creation
RestIntegTestTask pooledIntegTest = tasks.create("pooledIntegTest", RestIntegTestTask) {
@ -76,7 +76,7 @@ RestIntegTestTask pooledIntegTest = tasks.create("pooledIntegTest", RestIntegTes
}
}
testClusters.pooledIntegTest {
systemProperty 'io.netty.allocator.type', 'pooled'
systemProperty 'es.use_unpooled_allocator', 'false'
}
check.dependsOn(pooledTest, pooledIntegTest)

View File

@ -20,7 +20,6 @@
package org.elasticsearch.http.netty4;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
@ -32,7 +31,6 @@ import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
@ -63,7 +61,7 @@ import org.elasticsearch.http.HttpReadTimeoutException;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CopyBytesServerSocketChannel;
import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.net.InetSocketAddress;
@ -186,14 +184,12 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
// If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child
// channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer
// per-event-loop thread to be used for IO operations.
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
serverBootstrap.channel(NioServerSocketChannel.class);
} else {
serverBootstrap.channel(CopyBytesServerSocketChannel.class);
}
// NettyAllocator will return the channel type designed to work with the configuredAllocator
serverBootstrap.channel(NettyAllocator.getServerChannelType());
// Set the allocators for both the server channel and the child channels created
serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
serverBootstrap.childHandler(configureServerChannelHandler());
serverBootstrap.handler(new ServerChannelExceptionHandler(this));

View File

@ -0,0 +1,214 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.monitor.jvm.JvmInfo;
public class NettyAllocator {
private static final ByteBufAllocator ALLOCATOR;
private static final String USE_UNPOOLED = "es.use_unpooled_allocator";
private static final String USE_NETTY_DEFAULT = "es.unsafe.use_netty_default_allocator";
static {
if (Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT), false)) {
ALLOCATOR = ByteBufAllocator.DEFAULT;
} else {
ByteBufAllocator delegate;
if (useUnpooled()) {
delegate = new NoDirectBuffers(UnpooledByteBufAllocator.DEFAULT);
} else {
int nHeapArena = PooledByteBufAllocator.defaultNumHeapArena();
int pageSize = PooledByteBufAllocator.defaultPageSize();
int maxOrder = PooledByteBufAllocator.defaultMaxOrder();
int tinyCacheSize = PooledByteBufAllocator.defaultTinyCacheSize();
int smallCacheSize = PooledByteBufAllocator.defaultSmallCacheSize();
int normalCacheSize = PooledByteBufAllocator.defaultNormalCacheSize();
boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads();
delegate = new PooledByteBufAllocator(false, nHeapArena, 0, pageSize, maxOrder, tinyCacheSize,
smallCacheSize, normalCacheSize, useCacheForAllThreads);
}
ALLOCATOR = new NoDirectBuffers(delegate);
}
}
public static boolean useCopySocket() {
return ALLOCATOR instanceof NoDirectBuffers;
}
public static ByteBufAllocator getAllocator() {
return ALLOCATOR;
}
public static Class<? extends Channel> getChannelType() {
if (ALLOCATOR instanceof NoDirectBuffers) {
return CopyBytesSocketChannel.class;
} else {
return NioSocketChannel.class;
}
}
public static Class<? extends ServerChannel> getServerChannelType() {
if (ALLOCATOR instanceof NoDirectBuffers) {
return CopyBytesServerSocketChannel.class;
} else {
return NioServerSocketChannel.class;
}
}
private static boolean useUnpooled() {
if (System.getProperty(USE_UNPOOLED) != null) {
return Booleans.parseBoolean(System.getProperty(USE_UNPOOLED));
} else {
long heapSize = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
return heapSize <= 1 << 30;
}
}
private static class NoDirectBuffers implements ByteBufAllocator {
private final ByteBufAllocator delegate;
private NoDirectBuffers(ByteBufAllocator delegate) {
this.delegate = delegate;
}
@Override
public ByteBuf buffer() {
return heapBuffer();
}
@Override
public ByteBuf buffer(int initialCapacity) {
return heapBuffer(initialCapacity);
}
@Override
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
return heapBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf ioBuffer() {
return heapBuffer();
}
@Override
public ByteBuf ioBuffer(int initialCapacity) {
return heapBuffer(initialCapacity);
}
@Override
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
return heapBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf heapBuffer() {
return delegate.heapBuffer();
}
@Override
public ByteBuf heapBuffer(int initialCapacity) {
return delegate.heapBuffer(initialCapacity);
}
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
return delegate.heapBuffer(initialCapacity, maxCapacity);
}
@Override
public ByteBuf directBuffer() {
// TODO: Currently the Netty SslHandler requests direct ByteBufs even when interacting with the
// JDK SSLEngine. This will be fixed in a future version of Netty. For now, return a heap
// ByteBuf. After a Netty upgrade, return to throwing UnsupportedOperationException
return heapBuffer();
}
@Override
public ByteBuf directBuffer(int initialCapacity) {
// TODO: Currently the Netty SslHandler requests direct ByteBufs even when interacting with the
// JDK SSLEngine. This will be fixed in a future version of Netty. For now, return a heap
// ByteBuf. After a Netty upgrade, return to throwing UnsupportedOperationException
return heapBuffer(initialCapacity);
}
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
// TODO: Currently the Netty SslHandler requests direct ByteBufs even when interacting with the
// JDK SSLEngine. This will be fixed in a future version of Netty. For now, return a heap
// ByteBuf. After a Netty upgrade, return to throwing UnsupportedOperationException
return heapBuffer(initialCapacity, maxCapacity);
}
@Override
public CompositeByteBuf compositeBuffer() {
return compositeHeapBuffer();
}
@Override
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
return compositeHeapBuffer(maxNumComponents);
}
@Override
public CompositeByteBuf compositeHeapBuffer() {
return delegate.compositeHeapBuffer();
}
@Override
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
return delegate.compositeHeapBuffer(maxNumComponents);
}
@Override
public CompositeByteBuf compositeDirectBuffer() {
throw new UnsupportedOperationException("Direct buffers not supported.");
}
@Override
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
throw new UnsupportedOperationException("Direct buffers not supported.");
}
@Override
public boolean isDirectBufferPooled() {
assert delegate.isDirectBufferPooled() == false;
return false;
}
@Override
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
return delegate.calculateNewCapacity(minNewCapacity, maxCapacity);
}
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.transport.netty4;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -34,8 +33,6 @@ import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.LogManager;
@ -59,8 +56,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.net.NetUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CopyBytesServerSocketChannel;
import org.elasticsearch.transport.CopyBytesSocketChannel;
import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;
@ -152,13 +148,9 @@ public class Netty4Transport extends TcpTransport {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
// If direct buffer pooling is disabled, use the CopyBytesSocketChannel which will pool a single
// direct buffer per-event-loop thread which will be used for IO operations.
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
bootstrap.channel(NioSocketChannel.class);
} else {
bootstrap.channel(CopyBytesSocketChannel.class);
}
// NettyAllocator will return the channel type designed to work with the configured allocator
bootstrap.channel(NettyAllocator.getChannelType());
bootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
@ -216,14 +208,12 @@ public class Netty4Transport extends TcpTransport {
serverBootstrap.group(eventLoopGroup);
// If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child
// channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer
// per-event-loop thread to be used for IO operations.
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
serverBootstrap.channel(NioServerSocketChannel.class);
} else {
serverBootstrap.channel(CopyBytesServerSocketChannel.class);
}
// NettyAllocator will return the channel type designed to work with the configuredAllocator
serverBootstrap.channel(NettyAllocator.getServerChannelType());
// Set the allocators for both the server channel and the child channels created
serverBootstrap.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator());
serverBootstrap.childHandler(getServerChannelInitializer(name));
serverBootstrap.handler(new ServerChannelExceptionHandler());

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch;
import io.netty.buffer.PooledByteBufAllocator;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
@ -25,8 +26,8 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.Netty4Plugin;
import org.elasticsearch.transport.netty4.Netty4Transport;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
public abstract class ESNetty4IntegTestCase extends ESIntegTestCase {
@ -61,12 +62,19 @@ public abstract class ESNetty4IntegTestCase extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(Netty4Plugin.class);
return Collections.singletonList(Netty4Plugin.class);
}
@Override
public void tearDown() throws Exception {
super.tearDown();
assertEquals(0, PooledByteBufAllocator.DEFAULT.metric().usedHeapMemory());
assertEquals(0, PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory());
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return Arrays.asList(Netty4Plugin.class);
return Collections.singletonList(Netty4Plugin.class);
}
}

View File

@ -25,10 +25,10 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
@ -45,6 +45,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.NettyAllocator;
import java.io.Closeable;
import java.net.SocketAddress;
@ -84,7 +85,10 @@ class Netty4HttpClient implements Closeable {
private final Bootstrap clientBootstrap;
Netty4HttpClient() {
clientBootstrap = new Bootstrap().channel(NioSocketChannel.class).group(new NioEventLoopGroup());
clientBootstrap = new Bootstrap()
.channel(NettyAllocator.getChannelType())
.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
.group(new NioEventLoopGroup(1));
}
public Collection<FullHttpResponse> get(SocketAddress remoteAddress, String... uris) throws InterruptedException {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport.netty4;
import io.netty.buffer.PooledByteBufAllocator;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -45,6 +46,13 @@ import static org.hamcrest.Matchers.containsString;
public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase {
@Override
public void tearDown() throws Exception {
super.tearDown();
assertEquals(0, PooledByteBufAllocator.DEFAULT.metric().usedHeapMemory());
assertEquals(0, PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory());
}
@Override
protected Transport build(Settings settings, final Version version, ClusterSettings clusterSettings, boolean doHandshake) {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
@ -73,5 +81,4 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase
assertThat(e.getMessage(), containsString("[127.0.0.1:9876]"));
}
}
}