Disable netty direct buffer pooling by default (#44837)

Elasticsearch does not grant Netty reflection access to get Unsafe. The
only mechanism that currently exists to free direct buffers in a timely
manner is to use Unsafe. This leads to the occasional scenario, under
heavy network load, that direct byte buffers can slowly build up without
being freed.

This commit disables Netty direct buffer pooling and moves to a strategy
of using a single thread-local direct buffer for interfacing with sockets.
This will reduce the memory usage from networking. Elasticsearch
currently derives very little value from direct buffer usage (TLS,
compression, Lucene, Elasticsearch handling, etc all use heap bytes). So
this seems like the correct trade-off until that changes.
This commit is contained in:
Tim Brooks 2019-08-08 16:54:01 -04:00
parent b19de55095
commit af908efa41
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
8 changed files with 304 additions and 5 deletions

View File

@ -64,6 +64,9 @@ final class JvmErgonomics {
ergonomicChoices.add("-Dio.netty.allocator.type=pooled");
}
}
if (systemProperties.containsKey("io.netty.allocator.numDirectArenas") == false) {
ergonomicChoices.add("-Dio.netty.allocator.numDirectArenas=0");
}
final long maxDirectMemorySize = extractMaxDirectMemorySize(finalJvmOptions);
if (maxDirectMemorySize == 0) {
ergonomicChoices.add("-XX:MaxDirectMemorySize=" + heapSize / 2);

View File

@ -52,6 +52,9 @@ test {
* other if we allow them to set the number of available processors as it's set-once in Netty.
*/
systemProperty 'es.set.netty.runtime.available.processors', 'false'
// Disable direct buffer pooling as it is disabled by default in Elasticsearch
systemProperty 'io.netty.allocator.numDirectArenas', '0'
}
integTestRunner {
@ -60,6 +63,9 @@ integTestRunner {
* other if we allow them to set the number of available processors as it's set-once in Netty.
*/
systemProperty 'es.set.netty.runtime.available.processors', 'false'
// Disable direct buffer pooling as it is disabled by default in Elasticsearch
systemProperty 'io.netty.allocator.numDirectArenas', '0'
}
thirdPartyAudit {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.http.netty4;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
@ -62,6 +63,7 @@ import org.elasticsearch.http.HttpReadTimeoutException;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CopyBytesServerSocketChannel;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.net.InetSocketAddress;
@ -145,7 +147,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final int maxCompositeBufferComponents;
protected volatile ServerBootstrap serverBootstrap;
private volatile ServerBootstrap serverBootstrap;
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
@ -183,7 +185,15 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
// If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child
// channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer
// per-event-loop thread to be used for IO operations.
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
serverBootstrap.channel(NioServerSocketChannel.class);
} else {
serverBootstrap.channel(CopyBytesServerSocketChannel.class);
}
serverBootstrap.childHandler(configureServerChannelHandler());
serverBootstrap.handler(new ServerChannelExceptionHandler(this));

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.internal.SocketUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.channels.SocketChannel;
import java.util.List;
/**
* This class is adapted from {@link NioServerSocketChannel} class in the Netty project. It overrides the
* channel read messages behavior to ensure that a {@link CopyBytesSocketChannel} socket channel is created.
*/
public class CopyBytesServerSocketChannel extends NioServerSocketChannel {
private static final Logger logger = LogManager.getLogger(CopyBytesServerSocketChannel.class);
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new CopyBytesSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
}

View File

@ -0,0 +1,186 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.elasticsearch.common.SuppressForbidden;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Objects;
import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
/**
* This class is adapted from {@link NioSocketChannel} class in the Netty project. It overrides the channel
* read/write behavior to ensure that the bytes are always copied to a thread-local direct bytes buffer. This
* happens BEFORE the call to the Java {@link SocketChannel} is issued.
*
* The purpose of this class is to allow the disabling of netty direct buffer pooling while allowing us to
* control how bytes end up being copied to direct memory. If we simply disabled netty pooling, we would rely
* on the JDK's internal thread local buffer pooling. Instead, this class allows us to create a one thread
* local buffer with a defined size.
*/
@SuppressForbidden(reason = "Channel#write")
public class CopyBytesSocketChannel extends NioSocketChannel {
private static final int MAX_BYTES_PER_WRITE = 1 << 20;
private static final ThreadLocal<ByteBuffer> ioBuffer = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(MAX_BYTES_PER_WRITE));
private final WriteConfig writeConfig = new WriteConfig();
public CopyBytesSocketChannel() {
super();
}
CopyBytesSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = writeConfig.getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
assert Arrays.stream(nioBuffers).filter(Objects::nonNull).noneMatch(ByteBuffer::isDirect) : "Expected all to be heap buffers";
int nioBufferCnt = in.nioBufferCount();
if (nioBufferCnt == 0) {// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
} else {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer ioBuffer = getIoBuffer();
copyBytes(nioBuffers, nioBufferCnt, ioBuffer);
ioBuffer.flip();
int attemptedBytes = ioBuffer.remaining();
final int localWrittenBytes = ch.write(ioBuffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
ByteBuffer ioBuffer = getIoBuffer();
int bytesRead = javaChannel().read(ioBuffer);
ioBuffer.flip();
if (bytesRead > 0) {
byteBuf.writeBytes(ioBuffer);
}
return bytesRead;
}
private static ByteBuffer getIoBuffer() {
ByteBuffer ioBuffer = CopyBytesSocketChannel.ioBuffer.get();
ioBuffer.clear();
return ioBuffer;
}
private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
// By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
// SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
// make a best effort to adjust as OS behavior changes.
if (attempted == written) {
if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
writeConfig.setMaxBytesPerGatheringWrite(attempted << 1);
}
} else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
writeConfig.setMaxBytesPerGatheringWrite(attempted >>> 1);
}
}
private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) {
for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) {
ByteBuffer buffer = source[i];
int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining());
int initialLimit = buffer.limit();
buffer.limit(buffer.position() + nBytesToCopy);
destination.put(buffer);
buffer.limit(initialLimit);
}
}
private final class WriteConfig {
private volatile int maxBytesPerGatheringWrite = MAX_BYTES_PER_WRITE;
private WriteConfig() {
calculateMaxBytesPerGatheringWrite();
}
void setMaxBytesPerGatheringWrite(int maxBytesPerGatheringWrite) {
this.maxBytesPerGatheringWrite = Math.min(maxBytesPerGatheringWrite, MAX_BYTES_PER_WRITE);
}
int getMaxBytesPerGatheringWrite() {
return maxBytesPerGatheringWrite;
}
private void calculateMaxBytesPerGatheringWrite() {
// Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
int newSendBufferSize = config().getSendBufferSize() << 1;
if (newSendBufferSize > 0) {
setMaxBytesPerGatheringWrite(config().getSendBufferSize() << 1);
}
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport.netty4;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -58,6 +59,8 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.net.NetUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CopyBytesServerSocketChannel;
import org.elasticsearch.transport.CopyBytesSocketChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportSettings;
@ -148,7 +151,14 @@ public class Netty4Transport extends TcpTransport {
private Bootstrap createClientBootstrap(NioEventLoopGroup eventLoopGroup) {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
// If direct buffer pooling is disabled, use the CopyBytesSocketChannel which will pool a single
// direct buffer per-event-loop thread which will be used for IO operations.
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
bootstrap.channel(NioSocketChannel.class);
} else {
bootstrap.channel(CopyBytesSocketChannel.class);
}
bootstrap.option(ChannelOption.TCP_NODELAY, TransportSettings.TCP_NO_DELAY.get(settings));
bootstrap.option(ChannelOption.SO_KEEPALIVE, TransportSettings.TCP_KEEP_ALIVE.get(settings));
@ -205,7 +215,15 @@ public class Netty4Transport extends TcpTransport {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventLoopGroup);
// If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child
// channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer
// per-event-loop thread to be used for IO operations.
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
serverBootstrap.channel(NioServerSocketChannel.class);
} else {
serverBootstrap.channel(CopyBytesServerSocketChannel.class);
}
serverBootstrap.childHandler(getServerChannelInitializer(name));
serverBootstrap.handler(new ServerChannelExceptionHandler());

View File

@ -135,7 +135,7 @@ public final class NioGroupFactory {
* Wraps the {@link RefCountedNioGroup}. Calls {@link RefCountedNioGroup#decRef()} on close. After close,
* this wrapped instance can no longer be used.
*/
private class WrappedNioGroup implements NioGroup {
private static class WrappedNioGroup implements NioGroup {
private final RefCountedNioGroup refCountedNioGroup;

View File

@ -243,6 +243,9 @@ public abstract class ESTestCase extends LuceneTestCase {
// Enable Netty leak detection and monitor logger for logged leak errors
System.setProperty("io.netty.leakDetection.level", "paranoid");
// Disable direct buffer pooling
System.setProperty("io.netty.allocator.numDirectArenas", "0");
}
protected final Logger logger = LogManager.getLogger(getClass());