diff --git a/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java b/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java
index e06d93ccdc7..6c74029f9f5 100644
--- a/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java
+++ b/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java
@@ -24,6 +24,7 @@ import org.apache.lucene.util.CharsRefBuilder;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -42,7 +43,6 @@ import java.util.Arrays;
public class PagedBytesReference implements BytesReference {
private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
- private static final int NIO_GATHERING_LIMIT = 524288;
private final BigArrays bigarrays;
protected final ByteArray bytearray;
@@ -230,9 +230,7 @@ public class PagedBytesReference implements BytesReference {
// this would indicate that our numBuffer calculation is off by one.
assert (numBuffers == bufferSlot);
- // we can use gathering writes from the ChannelBuffers, but only if they are
- // moderately small to prevent OOMs due to DirectBuffer allocations.
- return ChannelBuffers.wrappedBuffer(length <= NIO_GATHERING_LIMIT, buffers);
+ return ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, buffers);
}
@Override
diff --git a/src/main/java/org/elasticsearch/common/netty/NettyUtils.java b/src/main/java/org/elasticsearch/common/netty/NettyUtils.java
index 57963fe01cf..b33ed2b7b37 100644
--- a/src/main/java/org/elasticsearch/common/netty/NettyUtils.java
+++ b/src/main/java/org/elasticsearch/common/netty/NettyUtils.java
@@ -18,22 +18,64 @@
*/
package org.elasticsearch.common.netty;
-import com.google.common.collect.Lists;
+import org.elasticsearch.common.Booleans;
+import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.buffer.CompositeChannelBuffer;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
-import java.util.List;
-
/**
*/
public class NettyUtils {
+ /**
+ * Here we go....
+ *
+ * When using the socket or file channel API to write or read using heap ByteBuffer, the sun.nio
+ * package will convert it to a direct buffer before doing the actual operation. The direct buffer is
+ * cached on an array of buffers under the nio.ch.Util$BufferCache on a thread local.
+ *
+ * In netty specifically, if we send a single ChannelBuffer that is bigger than
+ * SocketSendBufferPool#DEFAULT_PREALLOCATION_SIZE (64kb), it will just convert the ChannelBuffer
+ * to a ByteBuffer and send it. The problem is, that then same size DirectByteBuffer will be
+ * allocated (or reused) and kept around on a thread local in the sun.nio BufferCache. If very
+ * large buffer is sent, imagine a 10mb one, then a 10mb direct buffer will be allocated as an
+ * entry within the thread local buffers.
+ *
+ * In ES, we try and page the buffers allocated, all serialized data uses {@link org.elasticsearch.common.bytes.PagedBytesReference}
+ * typically generated from {@link org.elasticsearch.common.io.stream.BytesStreamOutput}. When sending it over
+ * to netty, it creates a {@link org.jboss.netty.buffer.CompositeChannelBuffer} that wraps the relevant pages.
+ *
+ * The idea with the usage of composite channel buffer is that a single large buffer will not be sent over
+ * to the sun.nio layer. But, this will only happen if the composite channel buffer is created with a gathering
+ * flag set to true. In such a case, the GatheringSendBuffer is used in netty, resulting in calling the sun.nio
+ * layer with a ByteBuffer array.
+ *
+ * This, potentially would have been as disastrous if the sun.nio layer would have tried to still copy over
+ * all of it to a direct buffer. But, the write(ByteBuffer[]) API (see sun.nio.ch.IOUtil), goes one buffer
+ * at a time, and gets a temporary direct buffer from the BufferCache, up to a limit of IOUtil#IOV_MAX (which
+ * is 1024 on most OSes). This means that there will be a max of 1024 direct buffer per thread.
+ *
+ * This is still less than optimal to be honest, since it means that if not all data was written successfully
+ * (1024 paged buffers), then the rest of the data will need to be copied over again to the direct buffer
+ * and re-transmitted, but its much better than trying to send the full large buffer over and over again.
+ *
+ * In ES, we use by default, in our paged data structures, a page of 16kb, so this is not so terrible.
+ *
+ * Note, on the read size of netty, it uses a single direct buffer that is defined in both the transport
+ * and http configuration (based on the direct memory available), and the upstream handlers (SizeHeaderFrameDecoder,
+ * or more specifically the FrameDecoder base class) makes sure to use a cumulation buffer and not copy it
+ * over all the time.
+ *
+ * TODO: potentially, a more complete solution would be to write a netty channel handler that is the last
+ * in the pipeline, and if the buffer is composite, verifies that its a gathering one with reasonable
+ * sized pages, and if its a single one, makes sure that it gets sliced and wrapped in a composite
+ * buffer.
+ */
+ public static final boolean DEFAULT_GATHERING;
+
private static EsThreadNameDeterminer ES_THREAD_NAME_DETERMINER = new EsThreadNameDeterminer();
public static class EsThreadNameDeterminer implements ThreadNameDeterminer {
@@ -53,25 +95,16 @@ public class NettyUtils {
});
ThreadRenamingRunnable.setThreadNameDeterminer(ES_THREAD_NAME_DETERMINER);
+
+ /**
+ * This is here just to give us an option to rollback the change, if its stable, we should remove
+ * the option to even set it.
+ */
+ DEFAULT_GATHERING = Booleans.parseBoolean(System.getProperty("es.netty.gathering"), true);
+ Loggers.getLogger(NettyUtils.class).debug("using gathering [{}]", DEFAULT_GATHERING);
}
public static void setup() {
}
-
- public static ChannelBuffer buildComposite(boolean useGathering, ChannelBuffer... buffers) {
- if (buffers == null || buffers.length == 0) {
- return ChannelBuffers.EMPTY_BUFFER;
- }
- List list = Lists.newArrayList();
- for (ChannelBuffer buffer : buffers) {
- if (buffer instanceof CompositeChannelBuffer) {
- CompositeChannelBuffer compBuffer = (CompositeChannelBuffer) buffer;
- list.addAll(compBuffer.decompose(0, compBuffer.readableBytes()));
- } else {
- list.add(buffer);
- }
- }
- return new CompositeChannelBuffer(buffers[0].order(), list, useGathering);
- }
}
diff --git a/src/main/java/org/elasticsearch/http/netty/ESHttpResponseEncoder.java b/src/main/java/org/elasticsearch/http/netty/ESHttpResponseEncoder.java
new file mode 100644
index 00000000000..36f2c3a138b
--- /dev/null
+++ b/src/main/java/org/elasticsearch/http/netty/ESHttpResponseEncoder.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.http.netty;
+
+import org.elasticsearch.common.netty.NettyUtils;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.CompositeChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+
+import java.util.List;
+
+/**
+ * Wraps a netty {@link HttpResponseEncoder} and makes sure that if the resulting
+ * channel buffer is composite, it will use the correct gathering flag. See more
+ * at {@link NettyUtils#DEFAULT_GATHERING}.
+ */
+public class ESHttpResponseEncoder extends HttpResponseEncoder {
+
+ @Override
+ protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
+ Object retVal = super.encode(ctx, channel, msg);
+ if (retVal instanceof CompositeChannelBuffer) {
+ CompositeChannelBuffer ccb = (CompositeChannelBuffer) retVal;
+ if (ccb.useGathering() != NettyUtils.DEFAULT_GATHERING) {
+ List decompose = ccb.decompose(ccb.readerIndex(), ccb.readableBytes());
+ return ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING,
+ decompose.toArray(new ChannelBuffer[decompose.size()]));
+ }
+ }
+ return retVal;
+ }
+}
diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java
index 8eb9ff8783a..ce3e6e1bff6 100644
--- a/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java
+++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java
@@ -25,6 +25,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.rest.RestResponse;
@@ -148,7 +149,7 @@ public class NettyHttpChannel extends HttpChannel {
final BytesRef callbackBytes = new BytesRef(callback);
callbackBytes.bytes[callbackBytes.length] = '(';
callbackBytes.length++;
- buffer = ChannelBuffers.wrappedBuffer(
+ buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING,
ChannelBuffers.wrappedBuffer(callbackBytes.bytes, callbackBytes.offset, callbackBytes.length),
buffer,
ChannelBuffers.wrappedBuffer(END_JSONP)
diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
index d34d97c8510..5fec6dca6ff 100644
--- a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
+++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
@@ -46,7 +46,6 @@ import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpContentCompressor;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import java.io.IOException;
@@ -367,7 +366,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent implem
bytes = bStream.bytes();
ChannelBuffer headerBuffer = bytes.toChannelBuffer();
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
- buffer = NettyUtils.buildComposite(false, headerBuffer, contentBuffer);
+ buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
} else {
request.writeTo(stream);
stream.close();
diff --git a/src/test/java/org/elasticsearch/network/DirectBufferNetworkTests.java b/src/test/java/org/elasticsearch/network/DirectBufferNetworkTests.java
new file mode 100644
index 00000000000..957422d78c4
--- /dev/null
+++ b/src/test/java/org/elasticsearch/network/DirectBufferNetworkTests.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.network;
+
+import org.apache.http.impl.client.HttpClients;
+import org.apache.lucene.util.TestUtil;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.http.HttpServerTransport;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+
+/**
+ */
+public class DirectBufferNetworkTests extends ElasticsearchIntegrationTest {
+
+ /**
+ * This test validates that using large data sets (large docs + large API requests) don't
+ * cause a large direct byte buffer to be allocated internally in the sun.nio buffer cache.
+ *
+ * See {@link org.elasticsearch.common.netty.NettyUtils#DEFAULT_GATHERING} for more info.
+ */
+ @Test
+ public void verifySaneDirectBufferAllocations() throws Exception {
+ createIndex("test");
+
+ int estimatedBytesSize = scaledRandomIntBetween(ByteSizeValue.parseBytesSizeValue("1.1mb").bytesAsInt(), ByteSizeValue.parseBytesSizeValue("1.5mb").bytesAsInt());
+ byte[] data = new byte[estimatedBytesSize];
+ getRandom().nextBytes(data);
+
+ ByteArrayOutputStream docOut = new ByteArrayOutputStream();
+ // we use smile to automatically use the binary mapping
+ XContentBuilder doc = XContentFactory.smileBuilder(docOut).startObject().startObject("doc").field("value", data).endObject();
+ doc.close();
+ byte[] docBytes = docOut.toByteArray();
+
+ int numDocs = randomIntBetween(2, 5);
+ logger.info("indexing [{}] docs, each with size [{}]", numDocs, estimatedBytesSize);
+ IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
+ for (int i = 0; i < numDocs; ++i) {
+ builders[i] = client().prepareIndex("test", "type").setSource(docBytes);
+ }
+ indexRandom(true, builders);
+ logger.info("done indexing");
+
+ logger.info("executing random client search for all docs");
+ assertHitCount(client().prepareSearch("test").setFrom(0).setSize(numDocs).get(), numDocs);
+ logger.info("executing transport client search for all docs");
+ assertHitCount(internalCluster().transportClient().prepareSearch("test").setFrom(0).setSize(numDocs).get(), numDocs);
+
+ logger.info("executing HTTP search for all docs");
+ // simulate large HTTP call as well
+ httpClient().method("GET").path("/test/_search").addParam("size", Integer.toString(numDocs)).execute();
+
+ logger.info("validating large direct buffer not allocated");
+ validateNoLargeDirectBufferAllocated();
+ }
+
+ private static HttpRequestBuilder httpClient() {
+ HttpServerTransport httpServerTransport = internalCluster().getDataNodeInstance(HttpServerTransport.class);
+ InetSocketAddress address = ((InetSocketTransportAddress) httpServerTransport.boundAddress().publishAddress()).address();
+ return new HttpRequestBuilder(HttpClients.createDefault()).host(address.getHostName()).port(address.getPort());
+ }
+
+ /**
+ * Validates that all the thread local allocated ByteBuffer in sun.nio under the Util$BufferCache
+ * are not greater than 1mb.
+ */
+ private void validateNoLargeDirectBufferAllocated() throws Exception {
+ // Make the fields in the Thread class that store ThreadLocals
+ // accessible
+ Field threadLocalsField = Thread.class.getDeclaredField("threadLocals");
+ threadLocalsField.setAccessible(true);
+ // Make the underlying array of ThreadLoad.ThreadLocalMap.Entry objects
+ // accessible
+ Class> tlmClass = Class.forName("java.lang.ThreadLocal$ThreadLocalMap");
+ Field tableField = tlmClass.getDeclaredField("table");
+ tableField.setAccessible(true);
+
+ for (Thread thread : Thread.getAllStackTraces().keySet()) {
+ if (thread == null) {
+ continue;
+ }
+ Object threadLocalMap = threadLocalsField.get(thread);
+ if (threadLocalMap == null) {
+ continue;
+ }
+ Object[] table = (Object[]) tableField.get(threadLocalMap);
+ if (table == null) {
+ continue;
+ }
+ for (Object entry : table) {
+ if (entry == null) {
+ continue;
+ }
+ Field valueField = entry.getClass().getDeclaredField("value");
+ valueField.setAccessible(true);
+ Object value = valueField.get(entry);
+ if (value == null) {
+ continue;
+ }
+ if (!value.getClass().getName().equals("sun.nio.ch.Util$BufferCache")) {
+ continue;
+ }
+ Field buffersField = value.getClass().getDeclaredField("buffers");
+ buffersField.setAccessible(true);
+ Object[] buffers = (Object[]) buffersField.get(value);
+ for (Object buffer : buffers) {
+ if (buffer == null) {
+ continue;
+ }
+ assertThat(((ByteBuffer) buffer).capacity(), Matchers.lessThan(1 * 1024 * 1024));
+ }
+ }
+ }
+
+ }
+}