From 5eeac2fdf6c108e2f9b6305b6577a77e5290ad9e Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 31 Oct 2014 16:07:26 +0100 Subject: [PATCH] Netty: Add HTTP pipelining support This adds HTTP pipelining support to netty. Previously pipelining was not supported due to the asynchronous nature of elasticsearch. The first request that was returned by Elasticsearch, was returned as first response, regardless of the correct order. The solution to this problem is to add a handler to the netty pipeline that maintains an ordered list and thus orders the responses before returning them to the client. This means, we will always have some state on the server side and also requires some memory in order to keep the responses there. Pipelining is enabled by default, but can be configured by setting the http.pipelining property to true|false. In addition the maximum size of the event queue can be configured. The initial netty handler is copied from this repo https://github.com/typesafehub/netty-http-pipelining Closes #2665 --- docs/reference/modules/http.asciidoc | 5 +- pom.xml | 7 +- .../http/netty/HttpRequestHandler.java | 19 +- .../http/netty/NettyHttpChannel.java | 29 ++- .../http/netty/NettyHttpRequest.java | 4 + .../http/netty/NettyHttpServerTransport.java | 25 +- .../pipelining/HttpPipeliningHandler.java | 109 +++++++++ .../OrderedDownstreamChannelEvent.java | 77 ++++++ .../OrderedUpstreamMessageEvent.java | 25 ++ .../http/netty/NettyHttpClient.java | 163 +++++++++++++ .../netty/NettyHttpServerPipeliningTest.java | 227 ++++++++++++++++++ ...ettyPipeliningDisabledIntegrationTest.java | 78 ++++++ ...NettyPipeliningEnabledIntegrationTest.java | 75 ++++++ .../pipelining/HttpPipeliningHandlerTest.java | 215 +++++++++++++++++ .../test/ElasticsearchIntegrationTest.java | 3 +- .../test/InternalTestCluster.java | 8 +- .../test/test/InternalTestClusterTests.java | 10 +- ...ttyTransportMultiPortIntegrationTests.java | 1 - .../org/elasticsearch/tribe/TribeTests.java | 2 +- 19 files changed, 1055 insertions(+), 27 deletions(-) create mode 100644 src/main/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandler.java create mode 100644 src/main/java/org/elasticsearch/http/netty/pipelining/OrderedDownstreamChannelEvent.java create mode 100644 src/main/java/org/elasticsearch/http/netty/pipelining/OrderedUpstreamMessageEvent.java create mode 100644 src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java create mode 100644 src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTest.java create mode 100644 src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIntegrationTest.java create mode 100644 src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIntegrationTest.java create mode 100644 src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java diff --git a/docs/reference/modules/http.asciidoc b/docs/reference/modules/http.asciidoc index 3f7d1b6109f..0ae864fa5e2 100644 --- a/docs/reference/modules/http.asciidoc +++ b/docs/reference/modules/http.asciidoc @@ -15,8 +15,6 @@ when connecting for better performance and try to get your favorite client not to do http://en.wikipedia.org/wiki/Chunked_transfer_encoding[HTTP chunking]. -IMPORTANT: HTTP pipelining is not supported and should be disabled in your HTTP client. - [float] === Settings @@ -69,6 +67,9 @@ be cached for. Defaults to `1728000` (20 days) header should be returned. Note: This header is only returned, when the setting is set to `true`. Defaults to `false` +|`http.pipelining` |Enable or disable HTTP pipelining, defaults to `true`. + +|`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`. |======================================================================= diff --git a/pom.xml b/pom.xml index 284418b5de0..463ef2287e4 100644 --- a/pom.xml +++ b/pom.xml @@ -1406,17 +1406,18 @@ src/test/java/org/elasticsearch/**/*.java - src/main/java/org/elasticsearch/common/inject/** + src/main/java/org/elasticsearch/common/inject/** src/main/java/org/elasticsearch/common/geo/GeoHashUtils.java src/main/java/org/elasticsearch/common/lucene/search/XBooleanFilter.java src/main/java/org/elasticsearch/common/lucene/search/XFilteredQuery.java src/main/java/org/apache/lucene/queryparser/XSimpleQueryParser.java src/main/java/org/apache/lucene/**/X*.java - src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java - + src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java src/test/java/org/elasticsearch/search/aggregations/metrics/GroupTree.java + + src/main/java/org/elasticsearch/http/netty/pipelining/** diff --git a/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java b/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java index f4ab7a0d808..d62dc0abbda 100644 --- a/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java +++ b/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java @@ -19,6 +19,7 @@ package org.elasticsearch.http.netty; +import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent; import org.elasticsearch.rest.support.RestUtils; import org.jboss.netty.channel.*; import org.jboss.netty.handler.codec.http.HttpRequest; @@ -34,19 +35,33 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler { private final NettyHttpServerTransport serverTransport; private final Pattern corsPattern; + private final boolean httpPipeliningEnabled; public HttpRequestHandler(NettyHttpServerTransport serverTransport) { this.serverTransport = serverTransport; this.corsPattern = RestUtils.getCorsSettingRegex(serverTransport.settings()); + this.httpPipeliningEnabled = serverTransport.pipelining; } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - HttpRequest request = (HttpRequest) e.getMessage(); + HttpRequest request; + OrderedUpstreamMessageEvent oue = null; + if (this.httpPipeliningEnabled && e instanceof OrderedUpstreamMessageEvent) { + oue = (OrderedUpstreamMessageEvent) e; + request = (HttpRequest) oue.getMessage(); + } else { + request = (HttpRequest) e.getMessage(); + } + // the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally // when reading, or using a cumalation buffer NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel()); - serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, e.getChannel(), httpRequest, corsPattern)); + if (oue != null) { + serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, corsPattern, oue)); + } else { + serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, corsPattern)); + } super.messageReceived(ctx, e); } diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java index ce3e6e1bff6..1693b199d7c 100644 --- a/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java +++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java @@ -28,14 +28,14 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.netty.NettyUtils; import org.elasticsearch.common.netty.ReleaseChannelFutureListener; import org.elasticsearch.http.HttpChannel; +import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent; +import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.support.RestUtils; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.*; import org.jboss.netty.handler.codec.http.*; import java.util.List; @@ -61,16 +61,22 @@ public class NettyHttpChannel extends HttpChannel { private final NettyHttpServerTransport transport; private final Channel channel; private final org.jboss.netty.handler.codec.http.HttpRequest nettyRequest; + private OrderedUpstreamMessageEvent orderedUpstreamMessageEvent = null; private Pattern corsPattern; - public NettyHttpChannel(NettyHttpServerTransport transport, Channel channel, NettyHttpRequest request, Pattern corsPattern) { + public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, Pattern corsPattern) { super(request); this.transport = transport; - this.channel = channel; + this.channel = request.getChannel(); this.nettyRequest = request.request(); this.corsPattern = corsPattern; } + public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, Pattern corsPattern, OrderedUpstreamMessageEvent orderedUpstreamMessageEvent) { + this(transport, request, corsPattern); + this.orderedUpstreamMessageEvent = orderedUpstreamMessageEvent; + } + @Override public BytesStreamOutput newBytesOutput() { return new ReleasableBytesStreamOutput(transport.bigArrays); @@ -185,14 +191,25 @@ public class NettyHttpChannel extends HttpChannel { } } - ChannelFuture future = channel.write(resp); + ChannelFuture future; + + if (orderedUpstreamMessageEvent != null) { + OrderedDownstreamChannelEvent downstreamChannelEvent = new OrderedDownstreamChannelEvent(orderedUpstreamMessageEvent, 0, true, resp); + future = downstreamChannelEvent.getFuture(); + channel.getPipeline().sendDownstream(downstreamChannelEvent); + } else { + future = channel.write(resp); + } + if (response.contentThreadSafe() && content instanceof Releasable) { future.addListener(new ReleaseChannelFutureListener((Releasable) content)); addedReleaseListener = true; } + if (close) { future.addListener(ChannelFutureListener.CLOSE); } + } finally { if (!addedReleaseListener && content instanceof Releasable) { ((Releasable) content).close(); diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java index d392f597cd2..4a55112ba80 100644 --- a/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java +++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java @@ -145,6 +145,10 @@ public class NettyHttpRequest extends HttpRequest { return channel.getLocalAddress(); } + public Channel getChannel() { + return channel; + } + @Override public String header(String name) { return request.headers().get(name); diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index 5fec6dca6ff..62baa59a72b 100644 --- a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.http.*; +import org.elasticsearch.http.netty.pipelining.HttpPipeliningHandler; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.transport.BindTransportException; import org.jboss.netty.bootstrap.ServerBootstrap; @@ -72,6 +73,13 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent Integer.MAX_VALUE) { @@ -174,8 +188,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent{}]", - maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictorMin, receivePredictorMax); + logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], receive_predictor[{}->{}], pipelining[{}], pipelining_max_events[{}]", + maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictorMin, receivePredictorMax, pipelining, pipeliningMaxEvents); } public Settings settings() { @@ -370,6 +384,9 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent holdingQueue; + + /** + * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel + * connection. This is required as events cannot queue up indefintely; we would run out of + * memory if this was the case. + */ + public HttpPipeliningHandler(final int maxEventsHeld) { + this.maxEventsHeld = maxEventsHeld; + + holdingQueue = new PriorityQueue<>(INITIAL_EVENTS_HELD, new Comparator() { + @Override + public int compare(OrderedDownstreamChannelEvent o1, OrderedDownstreamChannelEvent o2) { + final int delta = o1.getOrderedUpstreamMessageEvent().getSequence() - o2.getOrderedUpstreamMessageEvent().getSequence(); + if (delta == 0) { + return o1.getSubsequence() - o2.getSubsequence(); + } else { + return delta; + } + } + }); + } + + public int getMaxEventsHeld() { + return maxEventsHeld; + } + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) { + final Object msg = e.getMessage(); + if (msg instanceof HttpRequest) { + ctx.sendUpstream(new OrderedUpstreamMessageEvent(sequence++, e.getChannel(), msg, e.getRemoteAddress())); + } else { + ctx.sendUpstream(e); + } + } + + @Override + public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) + throws Exception { + if (e instanceof OrderedDownstreamChannelEvent) { + + boolean channelShouldClose = false; + + synchronized (holdingQueue) { + if (holdingQueue.size() < maxEventsHeld) { + + final OrderedDownstreamChannelEvent currentEvent = (OrderedDownstreamChannelEvent) e; + holdingQueue.add(currentEvent); + + while (!holdingQueue.isEmpty()) { + final OrderedDownstreamChannelEvent nextEvent = holdingQueue.peek(); + + if (nextEvent.getOrderedUpstreamMessageEvent().getSequence() != nextRequiredSequence | + nextEvent.getSubsequence() != nextRequiredSubsequence) { + break; + } + holdingQueue.remove(); + ctx.sendDownstream(nextEvent.getChannelEvent()); + if (nextEvent.isLast()) { + ++nextRequiredSequence; + nextRequiredSubsequence = 0; + } else { + ++nextRequiredSubsequence; + } + } + + } else { + channelShouldClose = true; + } + } + + if (channelShouldClose) { + Channels.close(e.getChannel()); + } + } else { + super.handleDownstream(ctx, e); + } + } + +} diff --git a/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedDownstreamChannelEvent.java b/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedDownstreamChannelEvent.java new file mode 100644 index 00000000000..6b713a08020 --- /dev/null +++ b/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedDownstreamChannelEvent.java @@ -0,0 +1,77 @@ +package org.elasticsearch.http.netty.pipelining; + +import org.jboss.netty.channel.*; + +/** + * Permits downstream channel events to be ordered and signalled as to whether more are to come for a given sequence. + * + * @author Christopher Hunt + */ +public class OrderedDownstreamChannelEvent implements ChannelEvent { + + final ChannelEvent ce; + final OrderedUpstreamMessageEvent oue; + final int subsequence; + final boolean last; + + /** + * Construct a downstream channel event for all types of events. + * + * @param oue the OrderedUpstreamMessageEvent that this response is associated with + * @param subsequence the sequence within the sequence + * @param last when set to true this indicates that there are no more responses to be received for the + * original OrderedUpstreamMessageEvent + */ + public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oue, final int subsequence, boolean last, + final ChannelEvent ce) { + this.oue = oue; + this.ce = ce; + this.subsequence = subsequence; + this.last = last; + } + + /** + * Convenience constructor signifying that this downstream message event is the last one for the given sequence, + * and that there is only one response. + */ + public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oe, + final Object message) { + this(oe, 0, true, message); + } + + /** + * Convenience constructor for passing message events. + */ + public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oue, final int subsequence, boolean last, + final Object message) { + this(oue, subsequence, last, new DownstreamMessageEvent(oue.getChannel(), Channels.future(oue.getChannel()), + message, oue.getRemoteAddress())); + + } + + public OrderedUpstreamMessageEvent getOrderedUpstreamMessageEvent() { + return oue; + } + + public int getSubsequence() { + return subsequence; + } + + public boolean isLast() { + return last; + } + + @Override + public Channel getChannel() { + return ce.getChannel(); + } + + @Override + public ChannelFuture getFuture() { + return ce.getFuture(); + } + + public ChannelEvent getChannelEvent() { + return ce; + } +} diff --git a/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedUpstreamMessageEvent.java b/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedUpstreamMessageEvent.java new file mode 100644 index 00000000000..7343b29b6c5 --- /dev/null +++ b/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedUpstreamMessageEvent.java @@ -0,0 +1,25 @@ +package org.elasticsearch.http.netty.pipelining; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.UpstreamMessageEvent; + +import java.net.SocketAddress; + +/** + * Permits upstream message events to be ordered. + * + * @author Christopher Hunt + */ +public class OrderedUpstreamMessageEvent extends UpstreamMessageEvent { + final int sequence; + + public OrderedUpstreamMessageEvent(final int sequence, final Channel channel, final Object msg, final SocketAddress remoteAddress) { + super(channel, msg, remoteAddress); + this.sequence = sequence; + } + + public int getSequence() { + return sequence; + } + +} diff --git a/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java b/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java new file mode 100644 index 00000000000..fe83258a726 --- /dev/null +++ b/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java @@ -0,0 +1,163 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http.netty; + +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.channel.*; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.handler.codec.http.*; + +import java.io.Closeable; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.HOST; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Tiny helper + */ +public class NettyHttpClient implements Closeable { + + private static final Function FUNCTION_RESPONSE_TO_CONTENT = new Function() { + @Override + public String apply(HttpResponse response) { + return response.getContent().toString(Charsets.UTF_8); + } + }; + + private static final Function FUNCTION_RESPONSE_OPAQUE_ID = new Function() { + @Override + public String apply(HttpResponse response) { + return response.headers().get("X-Opaque-Id"); + } + }; + + public static Collection returnHttpResponseBodies(Collection responses) { + return Collections2.transform(responses, FUNCTION_RESPONSE_TO_CONTENT); + } + + public static Collection returnOpaqueIds(Collection responses) { + return Collections2.transform(responses, FUNCTION_RESPONSE_OPAQUE_ID); + } + + private final ClientBootstrap clientBootstrap; + + public NettyHttpClient() { + clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());; + } + + public Collection sendRequests(SocketAddress remoteAddress, String... uris) throws InterruptedException { + return sendRequests(remoteAddress, -1, uris); + } + + public synchronized Collection sendRequests(SocketAddress remoteAddress, long expectedMaxDuration, String... uris) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(uris.length); + final Collection content = Collections.synchronizedList(new ArrayList(uris.length)); + + clientBootstrap.setPipelineFactory(new CountDownLatchPipelineFactory(latch, content)); + + ChannelFuture channelFuture = null; + try { + channelFuture = clientBootstrap.connect(remoteAddress); + channelFuture.await(1000); + + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < uris.length; i++) { + final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); + channelFuture.getChannel().write(httpRequest); + } + latch.await(); + + long duration = System.currentTimeMillis() - startTime; + // make sure the request were executed in parallel + if (expectedMaxDuration > 0) { + assertThat(duration, is(lessThan(expectedMaxDuration))); + } + } finally { + if (channelFuture != null) { + channelFuture.getChannel().close(); + } + } + + + return content; + } + + @Override + public void close() { + clientBootstrap.shutdown(); + clientBootstrap.releaseExternalResources(); + } + + /** + * helper factory which adds returned data to a list and uses a count down latch to decide when done + */ + public static class CountDownLatchPipelineFactory implements ChannelPipelineFactory { + private final CountDownLatch latch; + private final Collection content; + + public CountDownLatchPipelineFactory(CountDownLatch latch, Collection content) { + this.latch = latch; + this.content = content; + } + + @Override + public ChannelPipeline getPipeline() throws Exception { + final int maxBytes = new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt(); + return Channels.pipeline( + new HttpClientCodec(), + new HttpChunkAggregator(maxBytes), + new SimpleChannelUpstreamHandler() { + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) { + final Object message = e.getMessage(); + + if (message instanceof HttpResponse) { + HttpResponse response = (HttpResponse) message; + content.add(response); + } + + latch.countDown(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { + super.exceptionCaught(ctx, e); + latch.countDown(); + } + }); + } + } + +} diff --git a/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTest.java b/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTest.java new file mode 100644 index 00000000000..0c2a981b0d4 --- /dev/null +++ b/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTest.java @@ -0,0 +1,227 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http.netty; + +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent; +import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.test.cache.recycler.MockBigArrays; +import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.ThreadPool; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.*; +import org.jboss.netty.handler.codec.http.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.http.netty.NettyHttpClient.returnHttpResponseBodies; +import static org.elasticsearch.http.netty.NettyHttpServerTransport.HttpChannelPipelineFactory; +import static org.hamcrest.Matchers.*; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.CLOSE; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_0; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * This test just tests, if he pipelining works in general with out any connection the elasticsearch handler + */ +public class NettyHttpServerPipeliningTest extends ElasticsearchTestCase { + + private NetworkService networkService; + private ThreadPool threadPool; + private MockPageCacheRecycler mockPageCacheRecycler; + private MockBigArrays bigArrays; + private CustomNettyHttpServerTransport httpServerTransport; + + @Before + public void setup() throws Exception { + networkService = new NetworkService(ImmutableSettings.EMPTY); + threadPool = new ThreadPool("test"); + mockPageCacheRecycler = new MockPageCacheRecycler(ImmutableSettings.EMPTY, threadPool); + bigArrays = new MockBigArrays(ImmutableSettings.EMPTY, mockPageCacheRecycler, new NoneCircuitBreakerService()); + } + + @After + public void shutdown() throws Exception { + if (threadPool != null) { + threadPool.shutdownNow(); + } + if (httpServerTransport != null) { + httpServerTransport.close(); + } + } + + @Test + @TestLogging("_root:DEBUG") + public void testThatHttpPipeliningWorksWhenEnabled() throws Exception { + Settings settings = settingsBuilder().put("http.pipelining", true).build(); + httpServerTransport = new CustomNettyHttpServerTransport(settings); + httpServerTransport.start(); + InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress(); + + List requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast"); + long maxdurationInMilliSeconds = 1200; + try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { + Collection responses = nettyHttpClient.sendRequests(transportAddress.address(), maxdurationInMilliSeconds, requests.toArray(new String[]{})); + Collection responseBodies = returnHttpResponseBodies(responses); + assertThat(responseBodies, contains("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast")); + } + } + + @Test + @TestLogging("_root:TRACE") + public void testThatHttpPipeliningCanBeDisabled() throws Exception { + Settings settings = settingsBuilder().put("http.pipelining", false).build(); + httpServerTransport = new CustomNettyHttpServerTransport(settings); + httpServerTransport.start(); + InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress(); + + List requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500"); + long maxdurationInMilliSeconds = 1200; + try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { + Collection responses = nettyHttpClient.sendRequests(transportAddress.address(), maxdurationInMilliSeconds, requests.toArray(new String[]{})); + List responseBodies = Lists.newArrayList(returnHttpResponseBodies(responses)); + // we cannot be sure about the order of the fast requests, but the slow ones should have to be last + assertThat(responseBodies, hasSize(5)); + assertThat(responseBodies.get(3), is("/slow?sleep=500")); + assertThat(responseBodies.get(4), is("/slow?sleep=1000")); + } + } + + class CustomNettyHttpServerTransport extends NettyHttpServerTransport { + + private final ExecutorService executorService; + + public CustomNettyHttpServerTransport(Settings settings) { + super(settings, NettyHttpServerPipeliningTest.this.networkService, NettyHttpServerPipeliningTest.this.bigArrays); + this.executorService = Executors.newFixedThreadPool(5); + } + + @Override + public ChannelPipelineFactory configureServerChannelPipelineFactory() { + return new CustomHttpChannelPipelineFactory(this, executorService); + } + + @Override + public HttpServerTransport stop() throws ElasticsearchException { + executorService.shutdownNow(); + return super.stop(); + } + } + + private class CustomHttpChannelPipelineFactory extends HttpChannelPipelineFactory { + + private final ExecutorService executorService; + + public CustomHttpChannelPipelineFactory(NettyHttpServerTransport transport, ExecutorService executorService) { + super(transport); + this.executorService = executorService; + } + + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = super.getPipeline(); + pipeline.replace("handler", "handler", new PossiblySlowUpstreamHandler(executorService)); + return pipeline; + } + } + + class PossiblySlowUpstreamHandler extends SimpleChannelUpstreamHandler { + + private final ExecutorService executorService; + + public PossiblySlowUpstreamHandler(ExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception { + executorService.submit(new PossiblySlowRunnable(ctx, e)); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { + e.getCause().printStackTrace(); + e.getChannel().close(); + } + } + + class PossiblySlowRunnable implements Runnable { + + private ChannelHandlerContext ctx; + private MessageEvent e; + + public PossiblySlowRunnable(ChannelHandlerContext ctx, MessageEvent e) { + this.ctx = ctx; + this.e = e; + } + + @Override + public void run() { + HttpRequest request; + OrderedUpstreamMessageEvent oue = null; + if (e instanceof OrderedUpstreamMessageEvent) { + oue = (OrderedUpstreamMessageEvent) e; + request = (HttpRequest) oue.getMessage(); + } else { + request = (HttpRequest) e.getMessage(); + } + + ChannelBuffer buffer = ChannelBuffers.copiedBuffer(request.getUri(), Charsets.UTF_8); + + DefaultHttpResponse httpResponse = new DefaultHttpResponse(HTTP_1_1, OK); + httpResponse.headers().add(CONTENT_LENGTH, buffer.readableBytes()); + httpResponse.setContent(buffer); + + QueryStringDecoder decoder = new QueryStringDecoder(request.getUri()); + + final int timeout = request.getUri().startsWith("/slow") && decoder.getParameters().containsKey("sleep") ? Integer.valueOf(decoder.getParameters().get("sleep").get(0)) : 0; + if (timeout > 0) { + sleep(timeout); + } + + if (oue != null) { + ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, 0, true, httpResponse)); + } else { + ctx.getChannel().write(httpResponse); + } + } + } +} diff --git a/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIntegrationTest.java b/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIntegrationTest.java new file mode 100644 index 00000000000..7372b47d57b --- /dev/null +++ b/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIntegrationTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http.netty; + +import com.google.common.collect.Lists; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Locale; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.http.netty.NettyHttpClient.returnOpaqueIds; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.hamcrest.Matchers.*; + +/** + * + */ +@ClusterScope(scope = Scope.TEST, numDataNodes = 1) +public class NettyPipeliningDisabledIntegrationTest extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put(InternalNode.HTTP_ENABLED, true).put("http.pipelining", false).build(); + } + + @Test + public void testThatNettyHttpServerDoesNotSupportPipelining() throws Exception { + ensureGreen(); + List requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/"); + + HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress(); + + try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { + Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); + assertThat(responses, hasSize(requests.size())); + + List opaqueIds = Lists.newArrayList(returnOpaqueIds(responses)); + + assertResponsesOutOfOrder(opaqueIds); + } + } + + /** + * checks if all responses are there, but also tests that they are out of order because pipelining is disabled + */ + private void assertResponsesOutOfOrder(List opaqueIds) { + String message = String.format(Locale.ROOT, "Expected returned http message ids to be out of order: %s", opaqueIds); + assertThat(opaqueIds, hasItems("0", "1", "2", "3", "4", "5", "6")); + assertThat(message, opaqueIds, not(contains("0", "1", "2", "3", "4", "5", "6"))); + } +} diff --git a/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIntegrationTest.java b/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIntegrationTest.java new file mode 100644 index 00000000000..e68a57ca94d --- /dev/null +++ b/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIntegrationTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http.netty; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.node.internal.InternalNode; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Locale; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.http.netty.NettyHttpClient.returnOpaqueIds; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + + +@ClusterScope(scope = Scope.TEST, numDataNodes = 1) +public class NettyPipeliningEnabledIntegrationTest extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put(InternalNode.HTTP_ENABLED, true).put("http.pipelining", true).build(); + } + + @Test + public void testThatNettyHttpServerSupportsPipelining() throws Exception { + List requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/"); + + HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress(); + + try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { + Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); + assertThat(responses, hasSize(5)); + + Collection opaqueIds = returnOpaqueIds(responses); + assertOpaqueIdsInOrder(opaqueIds); + } + } + + private void assertOpaqueIdsInOrder(Collection opaqueIds) { + // check if opaque ids are monotonically increasing + int i = 0; + String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be monotonically increasing, got [" + opaqueIds + "]"); + for (String opaqueId : opaqueIds) { + assertThat(msg, opaqueId, is(String.valueOf(i++))); + } + } + +} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java b/src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java new file mode 100644 index 00000000000..110a2d73160 --- /dev/null +++ b/src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http.netty.pipelining; + +import org.elasticsearch.test.ElasticsearchTestCase; +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.*; +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; +import org.jboss.netty.handler.codec.http.*; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timeout; +import org.jboss.netty.util.TimerTask; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.jboss.netty.buffer.ChannelBuffers.EMPTY_BUFFER; +import static org.jboss.netty.buffer.ChannelBuffers.copiedBuffer; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.CHUNKED; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.KEEP_ALIVE; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import static org.jboss.netty.util.CharsetUtil.UTF_8; + +/** + * + */ +public class HttpPipeliningHandlerTest extends ElasticsearchTestCase { + + private static final long RESPONSE_TIMEOUT = 10000L; + private static final long CONNECTION_TIMEOUT = 10000L; + private static final String CONTENT_TYPE_TEXT = "text/plain; charset=UTF-8"; + // TODO make me random + private static final InetSocketAddress HOST_ADDR = new InetSocketAddress("127.0.0.1", 9080); + private static final String PATH1 = "/1"; + private static final String PATH2 = "/2"; + private static final String SOME_RESPONSE_TEXT = "some response for "; + + private ClientBootstrap clientBootstrap; + private ServerBootstrap serverBootstrap; + + private CountDownLatch responsesIn; + private final List responses = new ArrayList<>(2); + + private HashedWheelTimer timer; + + @Before + public void startBootstraps() { + clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory()); + + clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline( + new HttpClientCodec(), + new ClientHandler() + ); + } + }); + + serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory()); + + serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline( + new HttpRequestDecoder(), + new HttpResponseEncoder(), + new HttpPipeliningHandler(10000), + new ServerHandler() + ); + } + }); + + serverBootstrap.bind(HOST_ADDR); + + timer = new HashedWheelTimer(); + } + + @After + public void releaseResources() { + timer.stop(); + + serverBootstrap.shutdown(); + serverBootstrap.releaseExternalResources(); + clientBootstrap.shutdown(); + clientBootstrap.releaseExternalResources(); + } + + @Test + public void shouldReturnMessagesInOrder() throws InterruptedException { + responsesIn = new CountDownLatch(1); + responses.clear(); + + final ChannelFuture connectionFuture = clientBootstrap.connect(HOST_ADDR); + + assertTrue(connectionFuture.await(CONNECTION_TIMEOUT)); + final Channel clientChannel = connectionFuture.getChannel(); + + final HttpRequest request1 = new DefaultHttpRequest( + HTTP_1_1, HttpMethod.GET, PATH1); + request1.headers().add(HOST, HOST_ADDR.toString()); + + final HttpRequest request2 = new DefaultHttpRequest( + HTTP_1_1, HttpMethod.GET, PATH2); + request2.headers().add(HOST, HOST_ADDR.toString()); + + clientChannel.write(request1); + clientChannel.write(request2); + + responsesIn.await(RESPONSE_TIMEOUT, MILLISECONDS); + + assertTrue(responses.contains(SOME_RESPONSE_TEXT + PATH1)); + assertTrue(responses.contains(SOME_RESPONSE_TEXT + PATH2)); + } + + public class ClientHandler extends SimpleChannelUpstreamHandler { + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) { + final Object message = e.getMessage(); + if (message instanceof HttpChunk) { + final HttpChunk response = (HttpChunk) e.getMessage(); + if (!response.isLast()) { + final String content = response.getContent().toString(UTF_8); + responses.add(content); + if (content.equals(SOME_RESPONSE_TEXT + PATH2)) { + responsesIn.countDown(); + } + } + } + } + } + + public class ServerHandler extends SimpleChannelUpstreamHandler { + private final AtomicBoolean sendFinalChunk = new AtomicBoolean(false); + + @Override + public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws InterruptedException { + final HttpRequest request = (HttpRequest) e.getMessage(); + + final OrderedUpstreamMessageEvent oue = (OrderedUpstreamMessageEvent) e; + final String uri = request.getUri(); + + final HttpResponse initialChunk = new DefaultHttpResponse(HTTP_1_1, OK); + initialChunk.headers().add(CONTENT_TYPE, CONTENT_TYPE_TEXT); + initialChunk.headers().add(CONNECTION, KEEP_ALIVE); + initialChunk.headers().add(TRANSFER_ENCODING, CHUNKED); + + ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, 0, false, initialChunk)); + + timer.newTimeout(new ChunkWriter(ctx, e, uri, oue, 1), 0, MILLISECONDS); + } + + private class ChunkWriter implements TimerTask { + private final ChannelHandlerContext ctx; + private final MessageEvent e; + private final String uri; + private final OrderedUpstreamMessageEvent oue; + private final int subSequence; + + public ChunkWriter(final ChannelHandlerContext ctx, final MessageEvent e, final String uri, + final OrderedUpstreamMessageEvent oue, final int subSequence) { + this.ctx = ctx; + this.e = e; + this.uri = uri; + this.oue = oue; + this.subSequence = subSequence; + } + + @Override + public void run(final Timeout timeout) { + if (sendFinalChunk.get() && subSequence > 1) { + final HttpChunk finalChunk = new DefaultHttpChunk(EMPTY_BUFFER); + ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, subSequence, true, finalChunk)); + } else { + final HttpChunk chunk = new DefaultHttpChunk(copiedBuffer(SOME_RESPONSE_TEXT + uri, UTF_8)); + ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, subSequence, false, chunk)); + + timer.newTimeout(new ChunkWriter(ctx, e, uri, oue, subSequence + 1), 0, MILLISECONDS); + + if (uri.equals(PATH2)) { + sendFinalChunk.set(true); + } + } + } + } + } +} diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index ba0abedc88b..7a1b072d480 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -1594,6 +1594,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException { int numClientNodes = InternalTestCluster.DEFAULT_NUM_CLIENT_NODES; boolean enableRandomBenchNodes = InternalTestCluster.DEFAULT_ENABLE_RANDOM_BENCH_NODES; + boolean enableHttpPipelining = InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING; int minNumDataNodes = InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES; int maxNumDataNodes = InternalTestCluster.DEFAULT_MAX_NUM_DATA_NODES; SettingsSource settingsSource = InternalTestCluster.DEFAULT_SETTINGS_SOURCE; @@ -1660,7 +1661,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase } return new InternalTestCluster(seed, minNumDataNodes, maxNumDataNodes, clusterName(scope.name(), Integer.toString(CHILD_JVM_ID), seed), settingsSource, numClientNodes, - enableRandomBenchNodes, CHILD_JVM_ID, nodePrefix); + enableRandomBenchNodes, enableHttpPipelining, CHILD_JVM_ID, nodePrefix); } /** diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index 11ccd207b93..152454f0779 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -152,6 +152,7 @@ public final class InternalTestCluster extends TestCluster { static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1; static final boolean DEFAULT_ENABLE_RANDOM_BENCH_NODES = true; + static final boolean DEFAULT_ENABLE_HTTP_PIPELINING = true; public static final String NODE_MODE = nodeMode(); @@ -193,13 +194,13 @@ public final class InternalTestCluster extends TestCluster { private ServiceDisruptionScheme activeDisruptionScheme; public InternalTestCluster(long clusterSeed, int minNumDataNodes, int maxNumDataNodes, String clusterName, int numClientNodes, boolean enableRandomBenchNodes, - int jvmOrdinal, String nodePrefix) { - this(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix); + boolean enableHttpPipelining, int jvmOrdinal, String nodePrefix) { + this(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix); } public InternalTestCluster(long clusterSeed, int minNumDataNodes, int maxNumDataNodes, String clusterName, SettingsSource settingsSource, int numClientNodes, - boolean enableRandomBenchNodes, + boolean enableRandomBenchNodes, boolean enableHttpPipelining, int jvmOrdinal, String nodePrefix) { super(clusterSeed); this.clusterName = clusterName; @@ -267,6 +268,7 @@ public final class InternalTestCluster extends TestCluster { builder.put("config.ignore_system_properties", true); builder.put("node.mode", NODE_MODE); builder.put("script.disable_dynamic", false); + builder.put("http.pipelining", enableHttpPipelining); builder.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false); if (Strings.hasLength(System.getProperty("es.logger.level"))) { builder.put("logger.level", System.getProperty("es.logger.level")); diff --git a/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java index 78bdb00ab5b..203f53a2d64 100644 --- a/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java +++ b/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java @@ -50,11 +50,12 @@ public class InternalTestClusterTests extends ElasticsearchTestCase { SettingsSource settingsSource = SettingsSource.EMPTY; int numClientNodes = randomIntBetween(0, 10); boolean enableRandomBenchNodes = randomBoolean(); + boolean enableHttpPipelining = randomBoolean(); int jvmOrdinal = randomIntBetween(0, 10); String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); - InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix); - InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix); + InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix); + InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix); assertClusters(cluster0, cluster1, true); } @@ -94,11 +95,12 @@ public class InternalTestClusterTests extends ElasticsearchTestCase { SettingsSource settingsSource = SettingsSource.EMPTY; int numClientNodes = randomIntBetween(0, 2); boolean enableRandomBenchNodes = randomBoolean(); + boolean enableHttpPipelining = randomBoolean(); int jvmOrdinal = randomIntBetween(0, 10); String nodePrefix = "foobar"; - InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix); - InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName1, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix); + InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix); + InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName1, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix); assertClusters(cluster0, cluster1, false); long seed = randomLong(); diff --git a/src/test/java/org/elasticsearch/test/transport/NettyTransportMultiPortIntegrationTests.java b/src/test/java/org/elasticsearch/test/transport/NettyTransportMultiPortIntegrationTests.java index 40983cddcb2..c8baa80e3fa 100644 --- a/src/test/java/org/elasticsearch/test/transport/NettyTransportMultiPortIntegrationTests.java +++ b/src/test/java/org/elasticsearch/test/transport/NettyTransportMultiPortIntegrationTests.java @@ -18,7 +18,6 @@ */ package org.elasticsearch.test.transport; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.client.transport.TransportClient; diff --git a/src/test/java/org/elasticsearch/tribe/TribeTests.java b/src/test/java/org/elasticsearch/tribe/TribeTests.java index e4bf9e05c60..c30dd92b6b9 100644 --- a/src/test/java/org/elasticsearch/tribe/TribeTests.java +++ b/src/test/java/org/elasticsearch/tribe/TribeTests.java @@ -64,7 +64,7 @@ public class TribeTests extends ElasticsearchIntegrationTest { public static void setupSecondCluster() throws Exception { ElasticsearchIntegrationTest.beforeClass(); // create another cluster - cluster2 = new InternalTestCluster(randomLong(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, CHILD_JVM_ID, SECOND_CLUSTER_NODE_PREFIX); + cluster2 = new InternalTestCluster(randomLong(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, false, CHILD_JVM_ID, SECOND_CLUSTER_NODE_PREFIX); cluster2.beforeTest(getRandom(), 0.1); cluster2.ensureAtLeastNumDataNodes(2); }