Split up large HTTP responses in outbound pipeline (#62666)

Currently Netty will batch compression an entire HTTP response
regardless of its content size. It allocates a byte array at least of
the same size as the uncompressed content. This causes issues with our
attempts to remove humungous G1GC allocations. This commit resolves the
issue by split responses into 128KB chunks.

This has the side-effect of making large outbound HTTP responses that
are compressed be send as chunked transfer-encoding.
This commit is contained in:
Tim Brooks 2020-09-24 14:20:12 -06:00
parent 43a4882951
commit 59dd889c10
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
11 changed files with 323 additions and 23 deletions

View File

@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.compression.JdkZlibEncoder;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpResponse;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.transport.NettyAllocator;
import java.util.List;
/**
* Split up large responses to prevent batch compression {@link JdkZlibEncoder} down the pipeline.
*/
@ChannelHandler.Sharable
class Netty4HttpResponseCreator extends MessageToMessageEncoder<Netty4HttpResponse> {
private static final String DO_NOT_SPLIT = "es.unsafe.do_not_split_http_responses";
private static final boolean DO_NOT_SPLIT_HTTP_RESPONSES;
private static final int SPLIT_THRESHOLD;
static {
DO_NOT_SPLIT_HTTP_RESPONSES = Booleans.parseBoolean(System.getProperty(DO_NOT_SPLIT), false);
// Netty will add some header bytes if it compresses this message. So we downsize slightly.
SPLIT_THRESHOLD = (int) (NettyAllocator.suggestedMaxAllocationSize() * 0.99);
}
@Override
protected void encode(ChannelHandlerContext ctx, Netty4HttpResponse msg, List<Object> out) {
if (DO_NOT_SPLIT_HTTP_RESPONSES || msg.content().readableBytes() <= SPLIT_THRESHOLD) {
out.add(msg.retain());
} else {
HttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers());
out.add(response);
ByteBuf content = msg.content();
while (content.readableBytes() > SPLIT_THRESHOLD) {
out.add(new DefaultHttpContent(content.readRetainedSlice(SPLIT_THRESHOLD)));
}
out.add(new DefaultLastHttpContent(content.readRetainedSlice(content.readableBytes())));
}
}
}

View File

@ -285,6 +285,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final Netty4HttpServerTransport transport;
private final Netty4HttpRequestCreator requestCreator;
private final Netty4HttpRequestHandler requestHandler;
private final Netty4HttpResponseCreator responseCreator;
private final HttpHandlingSettings handlingSettings;
protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
@ -292,6 +293,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
this.handlingSettings = handlingSettings;
this.requestCreator = new Netty4HttpRequestCreator();
this.requestHandler = new Netty4HttpRequestHandler(transport);
this.responseCreator = new Netty4HttpResponseCreator();
}
@Override
@ -314,6 +316,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
ch.pipeline().addLast("request_creator", requestCreator);
ch.pipeline().addLast("response_creator", responseCreator);
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);

View File

@ -163,9 +163,17 @@ public class CopyBytesSocketChannel extends Netty4NioSocketChannel {
private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) {
for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) {
ByteBuffer buffer = source[i];
assert buffer.hasArray() : "Buffer must have heap array";
int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining());
destination.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nBytesToCopy);
if (buffer.hasArray()) {
destination.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nBytesToCopy);
} else {
int initialLimit = buffer.limit();
int initialPosition = buffer.position();
buffer.limit(buffer.position() + nBytesToCopy);
destination.put(buffer);
buffer.position(initialPosition);
buffer.limit(initialLimit);
}
}
}

View File

@ -40,6 +40,7 @@ public class NettyAllocator {
private static final Logger logger = LogManager.getLogger(NettyAllocator.class);
private static final AtomicBoolean descriptionLogged = new AtomicBoolean(false);
private static final long SUGGESTED_MAX_ALLOCATION_SIZE;
private static final ByteBufAllocator ALLOCATOR;
private static final String DESCRIPTION;
@ -50,7 +51,9 @@ public class NettyAllocator {
static {
if (Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT), false)) {
ALLOCATOR = ByteBufAllocator.DEFAULT;
DESCRIPTION = "[name=netty_default, factors={es.unsafe.use_netty_default_allocator=true}]";
SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024;
DESCRIPTION = "[name=netty_default, suggested_max_allocation_size=" + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE)
+ ", factors={es.unsafe.use_netty_default_allocator=true}]";
} else {
final long heapSizeInBytes = JvmInfo.jvmInfo().getMem().getHeapMax().getBytes();
final boolean g1gcEnabled = Boolean.parseBoolean(JvmInfo.jvmInfo().useG1GC());
@ -62,7 +65,15 @@ public class NettyAllocator {
ByteBufAllocator delegate;
if (useUnpooled(heapSizeInBytes, g1gcEnabled, g1gcRegionSizeIsKnown, g1gcRegionSizeInBytes)) {
delegate = UnpooledByteBufAllocator.DEFAULT;
DESCRIPTION = "[name=unpooled, factors={es.unsafe.use_unpooled_allocator=" + userForcedUnpooled()
if (g1gcEnabled && g1gcRegionSizeIsKnown) {
// Suggested max allocation size 1/4 of region size. Guard against unknown edge cases
// where this value would be less than 256KB.
SUGGESTED_MAX_ALLOCATION_SIZE = Math.max(g1gcRegionSizeInBytes >> 2, 256 * 1024);
} else {
SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024;
}
DESCRIPTION = "[name=unpooled, suggested_max_allocation_size=" + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE)
+ ", factors={es.unsafe.use_unpooled_allocator=" + System.getProperty(USE_UNPOOLED)
+ ", g1gc_enabled=" + g1gcEnabled
+ ", g1gc_region_size=" + g1gcRegionSize
+ ", heap_size=" + heapSize + "}]";
@ -92,8 +103,11 @@ public class NettyAllocator {
boolean useCacheForAllThreads = PooledByteBufAllocator.defaultUseCacheForAllThreads();
delegate = new PooledByteBufAllocator(false, nHeapArena, 0, pageSize, maxOrder, tinyCacheSize,
smallCacheSize, normalCacheSize, useCacheForAllThreads);
ByteSizeValue chunkSize = new ByteSizeValue(pageSize << maxOrder);
int chunkSizeInBytes = pageSize << maxOrder;
ByteSizeValue chunkSize = new ByteSizeValue(chunkSizeInBytes);
SUGGESTED_MAX_ALLOCATION_SIZE = chunkSizeInBytes;
DESCRIPTION = "[name=elasticsearch_configured, chunk_size=" + chunkSize
+ ", suggested_max_allocation_size=" + new ByteSizeValue(SUGGESTED_MAX_ALLOCATION_SIZE)
+ ", factors={es.unsafe.use_netty_default_chunk_and_page_size=" + useDefaultChunkAndPageSize()
+ ", g1gc_enabled=" + g1gcEnabled
+ ", g1gc_region_size=" + g1gcRegionSize + "}]";
@ -112,6 +126,10 @@ public class NettyAllocator {
return ALLOCATOR;
}
public static long suggestedMaxAllocationSize() {
return SUGGESTED_MAX_ALLOCATION_SIZE;
}
public static String getAllocatorDescription() {
return DESCRIPTION;
}
@ -135,6 +153,8 @@ public class NettyAllocator {
private static boolean useUnpooled(long heapSizeInBytes, boolean g1gcEnabled, boolean g1gcRegionSizeIsKnown, long g1RegionSize) {
if (userForcedUnpooled()) {
return true;
} else if (userForcedPooled()) {
return true;
} else if (heapSizeInBytes <= 1 << 30) {
// If the heap is 1GB or less we use unpooled
return true;
@ -155,6 +175,14 @@ public class NettyAllocator {
}
}
private static boolean userForcedPooled() {
if (System.getProperty(USE_UNPOOLED) != null) {
return Booleans.parseBoolean(System.getProperty(USE_UNPOOLED)) == false;
} else {
return false;
}
}
private static boolean useDefaultChunkAndPageSize() {
if (System.getProperty(USE_NETTY_DEFAULT_CHUNK) != null) {
return Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT_CHUNK));
@ -163,7 +191,7 @@ public class NettyAllocator {
}
}
private static class NoDirectBuffers implements ByteBufAllocator {
public static class NoDirectBuffers implements ByteBufAllocator {
private final ByteBufAllocator delegate;
@ -271,5 +299,9 @@ public class NettyAllocator {
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
return delegate.calculateNewCapacity(minNewCapacity, maxCapacity);
}
public ByteBufAllocator getDelegate() {
return delegate;
}
}
}

View File

@ -32,6 +32,7 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
@ -91,8 +92,8 @@ class Netty4HttpClient implements Closeable {
.group(new NioEventLoopGroup(1));
}
public Collection<FullHttpResponse> get(SocketAddress remoteAddress, String... uris) throws InterruptedException {
Collection<HttpRequest> requests = new ArrayList<>(uris.length);
public List<FullHttpResponse> get(SocketAddress remoteAddress, String... uris) throws InterruptedException {
List<HttpRequest> requests = new ArrayList<>(uris.length);
for (int i = 0; i < uris.length; i++) {
final HttpRequest httpRequest = new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
httpRequest.headers().add(HOST, "localhost");
@ -107,10 +108,10 @@ class Netty4HttpClient implements Closeable {
return processRequestsWithBody(HttpMethod.POST, remoteAddress, urisAndBodies);
}
public final FullHttpResponse post(SocketAddress remoteAddress, FullHttpRequest httpRequest) throws InterruptedException {
Collection<FullHttpResponse> responses = sendRequests(remoteAddress, Collections.singleton(httpRequest));
public final FullHttpResponse send(SocketAddress remoteAddress, FullHttpRequest httpRequest) throws InterruptedException {
List<FullHttpResponse> responses = sendRequests(remoteAddress, Collections.singleton(httpRequest));
assert responses.size() == 1 : "expected 1 and only 1 http response";
return responses.iterator().next();
return responses.get(0);
}
public final Collection<FullHttpResponse> put(SocketAddress remoteAddress, List<Tuple<String, CharSequence>> urisAndBodies)
@ -118,9 +119,9 @@ class Netty4HttpClient implements Closeable {
return processRequestsWithBody(HttpMethod.PUT, remoteAddress, urisAndBodies);
}
private Collection<FullHttpResponse> processRequestsWithBody(HttpMethod method, SocketAddress remoteAddress, List<Tuple<String,
private List<FullHttpResponse> processRequestsWithBody(HttpMethod method, SocketAddress remoteAddress, List<Tuple<String,
CharSequence>> urisAndBodies) throws InterruptedException {
Collection<HttpRequest> requests = new ArrayList<>(urisAndBodies.size());
List<HttpRequest> requests = new ArrayList<>(urisAndBodies.size());
for (Tuple<String, CharSequence> uriAndBody : urisAndBodies) {
ByteBuf content = Unpooled.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8);
HttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uriAndBody.v1(), content);
@ -132,11 +133,11 @@ class Netty4HttpClient implements Closeable {
return sendRequests(remoteAddress, requests);
}
private synchronized Collection<FullHttpResponse> sendRequests(
private synchronized List<FullHttpResponse> sendRequests(
final SocketAddress remoteAddress,
final Collection<HttpRequest> requests) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(requests.size());
final Collection<FullHttpResponse> content = Collections.synchronizedList(new ArrayList<>(requests.size()));
final List<FullHttpResponse> content = Collections.synchronizedList(new ArrayList<>(requests.size()));
clientBootstrap.handler(new CountDownLatchHandler(latch, content));
@ -180,16 +181,20 @@ class Netty4HttpClient implements Closeable {
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel(SocketChannel ch) {
final int maxContentLength = new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt();
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new HttpRequestEncoder());
ch.pipeline().addLast(new HttpContentDecompressor());
ch.pipeline().addLast(new HttpObjectAggregator(maxContentLength));
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpObject>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
final FullHttpResponse response = (FullHttpResponse) msg;
content.add(response.copy());
// We copy the buffer manually to avoid a huge allocation on a pooled allocator. We have
// a test that tracks huge allocations, so we want to avoid them in this test code.
ByteBuf newContent = Unpooled.copiedBuffer(((FullHttpResponse) msg).content());
content.add(response.replace(newContent));
latch.countDown();
}

View File

@ -20,7 +20,11 @@
package org.elasticsearch.http.netty4;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.PoolArenaMetric;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerAdapter;
@ -178,13 +182,13 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
request.headers().set(HttpHeaderNames.EXPECT, expectation);
HttpUtil.setContentLength(request, contentLength);
final FullHttpResponse response = client.post(remoteAddress.address(), request);
final FullHttpResponse response = client.send(remoteAddress.address(), request);
try {
assertThat(response.status(), equalTo(expectedStatus));
if (expectedStatus.equals(HttpResponseStatus.CONTINUE)) {
final FullHttpRequest continuationRequest =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/", Unpooled.EMPTY_BUFFER);
final FullHttpResponse continuationResponse = client.post(remoteAddress.address(), continuationRequest);
final FullHttpResponse continuationResponse = client.send(remoteAddress.address(), continuationRequest);
try {
assertThat(continuationResponse.status(), is(HttpResponseStatus.OK));
assertThat(
@ -266,7 +270,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
final String url = "/" + new String(new byte[maxInitialLineLength], Charset.forName("UTF-8"));
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
final FullHttpResponse response = client.post(remoteAddress.address(), request);
final FullHttpResponse response = client.send(remoteAddress.address(), request);
try {
assertThat(response.status(), equalTo(HttpResponseStatus.BAD_REQUEST));
assertThat(
@ -282,6 +286,66 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
assertThat(causeReference.get(), instanceOf(TooLongFrameException.class));
}
public void testLargeCompressedResponse() throws InterruptedException {
final String responseString = randomAlphaOfLength(4 * 1024 * 1024);
final String url = "/thing";
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
if (url.equals(request.uri())) {
channel.sendResponse(new BytesRestResponse(OK, responseString));
} else {
logger.error("--> Unexpected successful uri [{}]", request.uri());
throw new AssertionError();
}
}
@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
logger.error(new ParameterizedMessage("--> Unexpected bad request [{}]",
FakeRestRequest.requestToString(channel.request())), cause);
throw new AssertionError();
}
};
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(
Settings.EMPTY, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher, clusterSettings,
new SharedGroupFactory(Settings.EMPTY))) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
try (Netty4HttpClient client = new Netty4HttpClient()) {
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip"));
long numOfHugeAllocations = getHugeAllocationCount();
final FullHttpResponse response = client.send(remoteAddress.address(), request);
try {
assertThat(getHugeAllocationCount(), equalTo(numOfHugeAllocations));
assertThat(response.status(), equalTo(HttpResponseStatus.OK));
byte[] bytes = new byte[response.content().readableBytes()];
response.content().readBytes(bytes);
assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(responseString));
} finally {
response.release();
}
}
}
}
private long getHugeAllocationCount() {
long numOfHugAllocations = 0;
ByteBufAllocator allocator = NettyAllocator.getAllocator();
assert allocator instanceof NettyAllocator.NoDirectBuffers;
ByteBufAllocator delegate = ((NettyAllocator.NoDirectBuffers) allocator).getDelegate();
if (delegate instanceof PooledByteBufAllocator) {
PooledByteBufAllocatorMetric metric = ((PooledByteBufAllocator) delegate).metric();
numOfHugAllocations = metric.heapArenas().stream().mapToLong(PoolArenaMetric::numHugeAllocations).sum();
}
return numOfHugAllocations;
}
public void testCorsRequest() throws InterruptedException {
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
@ -318,7 +382,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
request.headers().add(CorsHandler.ORIGIN, "elastic.co");
request.headers().add(CorsHandler.ACCESS_CONTROL_REQUEST_METHOD, "POST");
final FullHttpResponse response = client.post(remoteAddress.address(), request);
final FullHttpResponse response = client.send(remoteAddress.address(), request);
try {
assertThat(response.status(), equalTo(HttpResponseStatus.OK));
assertThat(response.headers().get(CorsHandler.ACCESS_CONTROL_ALLOW_ORIGIN), equalTo("elastic.co"));
@ -334,7 +398,7 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
final FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
request.headers().add(CorsHandler.ORIGIN, "elastic2.co");
final FullHttpResponse response = client.post(remoteAddress.address(), request);
final FullHttpResponse response = client.send(remoteAddress.address(), request);
try {
assertThat(response.status(), equalTo(HttpResponseStatus.FORBIDDEN));
} finally {

View File

@ -77,6 +77,7 @@ public class HttpReadWriteHandler implements NioChannelHandler {
handlers.add(new HttpContentCompressor(settings.getCompressionLevel()));
}
handlers.add(new NioHttpRequestCreator());
handlers.add(new NioHttpResponseCreator());
handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents()));
adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.http.nio;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.http.FullHttpRequest;
@ -26,6 +27,7 @@ import org.elasticsearch.ExceptionsHelper;
import java.util.List;
@ChannelHandler.Sharable
class NioHttpRequestCreator extends MessageToMessageDecoder<FullHttpRequest> {
@Override

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.nio;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpResponse;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.monitor.jvm.JvmInfo;
import java.util.List;
/**
* Split up large responses to prevent batch compression or other CPU intensive operations down the pipeline.
*/
@ChannelHandler.Sharable
public class NioHttpResponseCreator extends MessageToMessageEncoder<NioHttpResponse> {
private static final String DO_NOT_SPLIT = "es.unsafe.do_not_split_http_responses";
private static final boolean DO_NOT_SPLIT_HTTP_RESPONSES;
private static final int SPLIT_THRESHOLD;
static {
DO_NOT_SPLIT_HTTP_RESPONSES = Booleans.parseBoolean(System.getProperty(DO_NOT_SPLIT), false);
// Netty will add some header bytes if it compresses this message. So we downsize slightly.
SPLIT_THRESHOLD = (int) (suggestedMaxAllocationSize() * 0.99);
}
@Override
protected void encode(ChannelHandlerContext ctx, NioHttpResponse msg, List<Object> out) {
if (DO_NOT_SPLIT_HTTP_RESPONSES || msg.content().readableBytes() <= SPLIT_THRESHOLD) {
out.add(msg.retain());
} else {
HttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers());
out.add(response);
ByteBuf content = msg.content();
while (content.readableBytes() > SPLIT_THRESHOLD) {
out.add(new DefaultHttpContent(content.readRetainedSlice(SPLIT_THRESHOLD)));
}
out.add(new DefaultLastHttpContent(content.readRetainedSlice(content.readableBytes())));
}
}
private static long suggestedMaxAllocationSize() {
return Math.max(Math.max(JvmInfo.jvmInfo().getG1RegionSize(), 0) >> 2, 256 * 1024);
}
}

View File

@ -25,6 +25,7 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
@ -226,6 +227,7 @@ class NioHttpClient implements Closeable {
List<ChannelHandler> handlers = new ArrayList<>(5);
handlers.add(new HttpResponseDecoder());
handlers.add(new HttpRequestEncoder());
handlers.add(new HttpContentDecompressor());
handlers.add(new HttpObjectAggregator(maxContentLength));
adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));

View File

@ -279,6 +279,52 @@ public class NioHttpServerTransportTests extends ESTestCase {
}
}
public void testLargeCompressedResponse() throws InterruptedException {
final String responseString = randomAlphaOfLength(4 * 1024 * 1024);
final String url = "/thing";
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {
@Override
public void dispatchRequest(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) {
if (url.equals(request.uri())) {
channel.sendResponse(new BytesRestResponse(OK, responseString));
} else {
logger.error("--> Unexpected successful uri [{}]", request.uri());
throw new AssertionError();
}
}
@Override
public void dispatchBadRequest(final RestChannel channel, final ThreadContext threadContext, final Throwable cause) {
logger.error(new ParameterizedMessage("--> Unexpected bad request [{}]",
FakeRestRequest.requestToString(channel.request())), cause);
throw new AssertionError();
}
};
try (NioHttpServerTransport transport = new NioHttpServerTransport(
Settings.EMPTY, networkService, bigArrays, pageRecycler, threadPool, xContentRegistry(), dispatcher,
new NioGroupFactory(Settings.EMPTY, logger), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))) {
transport.start();
final TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
try (NioHttpClient client = new NioHttpClient()) {
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
request.headers().add(HttpHeaderNames.ACCEPT_ENCODING, randomFrom("deflate", "gzip"));
final FullHttpResponse response = client.send(remoteAddress.address(), request);
try {
assertThat(response.status(), equalTo(HttpResponseStatus.OK));
byte[] bytes = new byte[response.content().readableBytes()];
response.content().readBytes(bytes);
assertThat(new String(bytes, StandardCharsets.UTF_8), equalTo(responseString));
} finally {
response.release();
}
}
}
}
public void testBadRequest() throws InterruptedException {
final AtomicReference<Throwable> causeReference = new AtomicReference<>();
final HttpServerTransport.Dispatcher dispatcher = new HttpServerTransport.Dispatcher() {