diff --git a/docs/reference/modules/http.asciidoc b/docs/reference/modules/http.asciidoc
index 3f7d1b6109f..0ae864fa5e2 100644
--- a/docs/reference/modules/http.asciidoc
+++ b/docs/reference/modules/http.asciidoc
@@ -15,8 +15,6 @@ when connecting for better performance and try to get your favorite
client not to do
http://en.wikipedia.org/wiki/Chunked_transfer_encoding[HTTP chunking].
-IMPORTANT: HTTP pipelining is not supported and should be disabled in your HTTP client.
-
[float]
=== Settings
@@ -69,6 +67,9 @@ be cached for. Defaults to `1728000` (20 days)
header should be returned. Note: This header is only returned, when the setting is
set to `true`. Defaults to `false`
+|`http.pipelining` |Enable or disable HTTP pipelining, defaults to `true`.
+
+|`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`.
|=======================================================================
diff --git a/pom.xml b/pom.xml
index 284418b5de0..463ef2287e4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1406,17 +1406,18 @@
src/test/java/org/elasticsearch/**/*.java
- src/main/java/org/elasticsearch/common/inject/**
+ src/main/java/org/elasticsearch/common/inject/**
src/main/java/org/elasticsearch/common/geo/GeoHashUtils.java
src/main/java/org/elasticsearch/common/lucene/search/XBooleanFilter.java
src/main/java/org/elasticsearch/common/lucene/search/XFilteredQuery.java
src/main/java/org/apache/lucene/queryparser/XSimpleQueryParser.java
src/main/java/org/apache/lucene/**/X*.java
- src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java
-
+ src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java
src/test/java/org/elasticsearch/search/aggregations/metrics/GroupTree.java
+
+ src/main/java/org/elasticsearch/http/netty/pipelining/**
diff --git a/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java b/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java
index f4ab7a0d808..d62dc0abbda 100644
--- a/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java
+++ b/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java
@@ -19,6 +19,7 @@
package org.elasticsearch.http.netty;
+import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
import org.elasticsearch.rest.support.RestUtils;
import org.jboss.netty.channel.*;
import org.jboss.netty.handler.codec.http.HttpRequest;
@@ -34,19 +35,33 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
private final NettyHttpServerTransport serverTransport;
private final Pattern corsPattern;
+ private final boolean httpPipeliningEnabled;
public HttpRequestHandler(NettyHttpServerTransport serverTransport) {
this.serverTransport = serverTransport;
this.corsPattern = RestUtils.getCorsSettingRegex(serverTransport.settings());
+ this.httpPipeliningEnabled = serverTransport.pipelining;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
+ HttpRequest request;
+ OrderedUpstreamMessageEvent oue = null;
+ if (this.httpPipeliningEnabled && e instanceof OrderedUpstreamMessageEvent) {
+ oue = (OrderedUpstreamMessageEvent) e;
+ request = (HttpRequest) oue.getMessage();
+ } else {
+ request = (HttpRequest) e.getMessage();
+ }
+
// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
// when reading, or using a cumalation buffer
NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());
- serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, e.getChannel(), httpRequest, corsPattern));
+ if (oue != null) {
+ serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, corsPattern, oue));
+ } else {
+ serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, corsPattern));
+ }
super.messageReceived(ctx, e);
}
diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java
index ce3e6e1bff6..1693b199d7c 100644
--- a/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java
+++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java
@@ -28,14 +28,14 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.netty.NettyUtils;
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
import org.elasticsearch.http.HttpChannel;
+import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
+import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.support.RestUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.*;
import org.jboss.netty.handler.codec.http.*;
import java.util.List;
@@ -61,16 +61,22 @@ public class NettyHttpChannel extends HttpChannel {
private final NettyHttpServerTransport transport;
private final Channel channel;
private final org.jboss.netty.handler.codec.http.HttpRequest nettyRequest;
+ private OrderedUpstreamMessageEvent orderedUpstreamMessageEvent = null;
private Pattern corsPattern;
- public NettyHttpChannel(NettyHttpServerTransport transport, Channel channel, NettyHttpRequest request, Pattern corsPattern) {
+ public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, Pattern corsPattern) {
super(request);
this.transport = transport;
- this.channel = channel;
+ this.channel = request.getChannel();
this.nettyRequest = request.request();
this.corsPattern = corsPattern;
}
+ public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, Pattern corsPattern, OrderedUpstreamMessageEvent orderedUpstreamMessageEvent) {
+ this(transport, request, corsPattern);
+ this.orderedUpstreamMessageEvent = orderedUpstreamMessageEvent;
+ }
+
@Override
public BytesStreamOutput newBytesOutput() {
return new ReleasableBytesStreamOutput(transport.bigArrays);
@@ -185,14 +191,25 @@ public class NettyHttpChannel extends HttpChannel {
}
}
- ChannelFuture future = channel.write(resp);
+ ChannelFuture future;
+
+ if (orderedUpstreamMessageEvent != null) {
+ OrderedDownstreamChannelEvent downstreamChannelEvent = new OrderedDownstreamChannelEvent(orderedUpstreamMessageEvent, 0, true, resp);
+ future = downstreamChannelEvent.getFuture();
+ channel.getPipeline().sendDownstream(downstreamChannelEvent);
+ } else {
+ future = channel.write(resp);
+ }
+
if (response.contentThreadSafe() && content instanceof Releasable) {
future.addListener(new ReleaseChannelFutureListener((Releasable) content));
addedReleaseListener = true;
}
+
if (close) {
future.addListener(ChannelFutureListener.CLOSE);
}
+
} finally {
if (!addedReleaseListener && content instanceof Releasable) {
((Releasable) content).close();
diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java
index d392f597cd2..4a55112ba80 100644
--- a/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java
+++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java
@@ -145,6 +145,10 @@ public class NettyHttpRequest extends HttpRequest {
return channel.getLocalAddress();
}
+ public Channel getChannel() {
+ return channel;
+ }
+
@Override
public String header(String name) {
return request.headers().get(name);
diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
index 5fec6dca6ff..62baa59a72b 100644
--- a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
+++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java
@@ -37,6 +37,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.http.*;
+import org.elasticsearch.http.netty.pipelining.HttpPipeliningHandler;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.transport.BindTransportException;
import org.jboss.netty.bootstrap.ServerBootstrap;
@@ -72,6 +73,13 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent Integer.MAX_VALUE) {
@@ -174,8 +188,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent{}]",
- maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictorMin, receivePredictorMax);
+ logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], receive_predictor[{}->{}], pipelining[{}], pipelining_max_events[{}]",
+ maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictorMin, receivePredictorMax, pipelining, pipeliningMaxEvents);
}
public Settings settings() {
@@ -370,6 +384,9 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent holdingQueue;
+
+ /**
+ * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel
+ * connection. This is required as events cannot queue up indefintely; we would run out of
+ * memory if this was the case.
+ */
+ public HttpPipeliningHandler(final int maxEventsHeld) {
+ this.maxEventsHeld = maxEventsHeld;
+
+ holdingQueue = new PriorityQueue<>(INITIAL_EVENTS_HELD, new Comparator() {
+ @Override
+ public int compare(OrderedDownstreamChannelEvent o1, OrderedDownstreamChannelEvent o2) {
+ final int delta = o1.getOrderedUpstreamMessageEvent().getSequence() - o2.getOrderedUpstreamMessageEvent().getSequence();
+ if (delta == 0) {
+ return o1.getSubsequence() - o2.getSubsequence();
+ } else {
+ return delta;
+ }
+ }
+ });
+ }
+
+ public int getMaxEventsHeld() {
+ return maxEventsHeld;
+ }
+
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) {
+ final Object msg = e.getMessage();
+ if (msg instanceof HttpRequest) {
+ ctx.sendUpstream(new OrderedUpstreamMessageEvent(sequence++, e.getChannel(), msg, e.getRemoteAddress()));
+ } else {
+ ctx.sendUpstream(e);
+ }
+ }
+
+ @Override
+ public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
+ throws Exception {
+ if (e instanceof OrderedDownstreamChannelEvent) {
+
+ boolean channelShouldClose = false;
+
+ synchronized (holdingQueue) {
+ if (holdingQueue.size() < maxEventsHeld) {
+
+ final OrderedDownstreamChannelEvent currentEvent = (OrderedDownstreamChannelEvent) e;
+ holdingQueue.add(currentEvent);
+
+ while (!holdingQueue.isEmpty()) {
+ final OrderedDownstreamChannelEvent nextEvent = holdingQueue.peek();
+
+ if (nextEvent.getOrderedUpstreamMessageEvent().getSequence() != nextRequiredSequence |
+ nextEvent.getSubsequence() != nextRequiredSubsequence) {
+ break;
+ }
+ holdingQueue.remove();
+ ctx.sendDownstream(nextEvent.getChannelEvent());
+ if (nextEvent.isLast()) {
+ ++nextRequiredSequence;
+ nextRequiredSubsequence = 0;
+ } else {
+ ++nextRequiredSubsequence;
+ }
+ }
+
+ } else {
+ channelShouldClose = true;
+ }
+ }
+
+ if (channelShouldClose) {
+ Channels.close(e.getChannel());
+ }
+ } else {
+ super.handleDownstream(ctx, e);
+ }
+ }
+
+}
diff --git a/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedDownstreamChannelEvent.java b/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedDownstreamChannelEvent.java
new file mode 100644
index 00000000000..6b713a08020
--- /dev/null
+++ b/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedDownstreamChannelEvent.java
@@ -0,0 +1,77 @@
+package org.elasticsearch.http.netty.pipelining;
+
+import org.jboss.netty.channel.*;
+
+/**
+ * Permits downstream channel events to be ordered and signalled as to whether more are to come for a given sequence.
+ *
+ * @author Christopher Hunt
+ */
+public class OrderedDownstreamChannelEvent implements ChannelEvent {
+
+ final ChannelEvent ce;
+ final OrderedUpstreamMessageEvent oue;
+ final int subsequence;
+ final boolean last;
+
+ /**
+ * Construct a downstream channel event for all types of events.
+ *
+ * @param oue the OrderedUpstreamMessageEvent that this response is associated with
+ * @param subsequence the sequence within the sequence
+ * @param last when set to true this indicates that there are no more responses to be received for the
+ * original OrderedUpstreamMessageEvent
+ */
+ public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oue, final int subsequence, boolean last,
+ final ChannelEvent ce) {
+ this.oue = oue;
+ this.ce = ce;
+ this.subsequence = subsequence;
+ this.last = last;
+ }
+
+ /**
+ * Convenience constructor signifying that this downstream message event is the last one for the given sequence,
+ * and that there is only one response.
+ */
+ public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oe,
+ final Object message) {
+ this(oe, 0, true, message);
+ }
+
+ /**
+ * Convenience constructor for passing message events.
+ */
+ public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oue, final int subsequence, boolean last,
+ final Object message) {
+ this(oue, subsequence, last, new DownstreamMessageEvent(oue.getChannel(), Channels.future(oue.getChannel()),
+ message, oue.getRemoteAddress()));
+
+ }
+
+ public OrderedUpstreamMessageEvent getOrderedUpstreamMessageEvent() {
+ return oue;
+ }
+
+ public int getSubsequence() {
+ return subsequence;
+ }
+
+ public boolean isLast() {
+ return last;
+ }
+
+ @Override
+ public Channel getChannel() {
+ return ce.getChannel();
+ }
+
+ @Override
+ public ChannelFuture getFuture() {
+ return ce.getFuture();
+ }
+
+ public ChannelEvent getChannelEvent() {
+ return ce;
+ }
+}
diff --git a/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedUpstreamMessageEvent.java b/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedUpstreamMessageEvent.java
new file mode 100644
index 00000000000..7343b29b6c5
--- /dev/null
+++ b/src/main/java/org/elasticsearch/http/netty/pipelining/OrderedUpstreamMessageEvent.java
@@ -0,0 +1,25 @@
+package org.elasticsearch.http.netty.pipelining;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.UpstreamMessageEvent;
+
+import java.net.SocketAddress;
+
+/**
+ * Permits upstream message events to be ordered.
+ *
+ * @author Christopher Hunt
+ */
+public class OrderedUpstreamMessageEvent extends UpstreamMessageEvent {
+ final int sequence;
+
+ public OrderedUpstreamMessageEvent(final int sequence, final Channel channel, final Object msg, final SocketAddress remoteAddress) {
+ super(channel, msg, remoteAddress);
+ this.sequence = sequence;
+ }
+
+ public int getSequence() {
+ return sequence;
+ }
+
+}
diff --git a/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java b/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java
new file mode 100644
index 00000000000..fe83258a726
--- /dev/null
+++ b/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.http.netty;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.*;
+
+import java.io.Closeable;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.HOST;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Tiny helper
+ */
+public class NettyHttpClient implements Closeable {
+
+ private static final Function super HttpResponse, String> FUNCTION_RESPONSE_TO_CONTENT = new Function() {
+ @Override
+ public String apply(HttpResponse response) {
+ return response.getContent().toString(Charsets.UTF_8);
+ }
+ };
+
+ private static final Function super HttpResponse, String> FUNCTION_RESPONSE_OPAQUE_ID = new Function() {
+ @Override
+ public String apply(HttpResponse response) {
+ return response.headers().get("X-Opaque-Id");
+ }
+ };
+
+ public static Collection returnHttpResponseBodies(Collection responses) {
+ return Collections2.transform(responses, FUNCTION_RESPONSE_TO_CONTENT);
+ }
+
+ public static Collection returnOpaqueIds(Collection responses) {
+ return Collections2.transform(responses, FUNCTION_RESPONSE_OPAQUE_ID);
+ }
+
+ private final ClientBootstrap clientBootstrap;
+
+ public NettyHttpClient() {
+ clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());;
+ }
+
+ public Collection sendRequests(SocketAddress remoteAddress, String... uris) throws InterruptedException {
+ return sendRequests(remoteAddress, -1, uris);
+ }
+
+ public synchronized Collection sendRequests(SocketAddress remoteAddress, long expectedMaxDuration, String... uris) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(uris.length);
+ final Collection content = Collections.synchronizedList(new ArrayList(uris.length));
+
+ clientBootstrap.setPipelineFactory(new CountDownLatchPipelineFactory(latch, content));
+
+ ChannelFuture channelFuture = null;
+ try {
+ channelFuture = clientBootstrap.connect(remoteAddress);
+ channelFuture.await(1000);
+
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < uris.length; i++) {
+ final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
+ httpRequest.headers().add(HOST, "localhost");
+ httpRequest.headers().add("X-Opaque-ID", String.valueOf(i));
+ channelFuture.getChannel().write(httpRequest);
+ }
+ latch.await();
+
+ long duration = System.currentTimeMillis() - startTime;
+ // make sure the request were executed in parallel
+ if (expectedMaxDuration > 0) {
+ assertThat(duration, is(lessThan(expectedMaxDuration)));
+ }
+ } finally {
+ if (channelFuture != null) {
+ channelFuture.getChannel().close();
+ }
+ }
+
+
+ return content;
+ }
+
+ @Override
+ public void close() {
+ clientBootstrap.shutdown();
+ clientBootstrap.releaseExternalResources();
+ }
+
+ /**
+ * helper factory which adds returned data to a list and uses a count down latch to decide when done
+ */
+ public static class CountDownLatchPipelineFactory implements ChannelPipelineFactory {
+ private final CountDownLatch latch;
+ private final Collection content;
+
+ public CountDownLatchPipelineFactory(CountDownLatch latch, Collection content) {
+ this.latch = latch;
+ this.content = content;
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ final int maxBytes = new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt();
+ return Channels.pipeline(
+ new HttpClientCodec(),
+ new HttpChunkAggregator(maxBytes),
+ new SimpleChannelUpstreamHandler() {
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) {
+ final Object message = e.getMessage();
+
+ if (message instanceof HttpResponse) {
+ HttpResponse response = (HttpResponse) message;
+ content.add(response);
+ }
+
+ latch.countDown();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ super.exceptionCaught(ctx, e);
+ latch.countDown();
+ }
+ });
+ }
+ }
+
+}
diff --git a/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTest.java b/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTest.java
new file mode 100644
index 00000000000..0c2a981b0d4
--- /dev/null
+++ b/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.http.netty;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.common.network.NetworkService;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.http.HttpServerTransport;
+import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
+import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
+import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.elasticsearch.test.cache.recycler.MockBigArrays;
+import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.http.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.elasticsearch.http.netty.NettyHttpClient.returnHttpResponseBodies;
+import static org.elasticsearch.http.netty.NettyHttpServerTransport.HttpChannelPipelineFactory;
+import static org.hamcrest.Matchers.*;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_0;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * This test just tests, if he pipelining works in general with out any connection the elasticsearch handler
+ */
+public class NettyHttpServerPipeliningTest extends ElasticsearchTestCase {
+
+ private NetworkService networkService;
+ private ThreadPool threadPool;
+ private MockPageCacheRecycler mockPageCacheRecycler;
+ private MockBigArrays bigArrays;
+ private CustomNettyHttpServerTransport httpServerTransport;
+
+ @Before
+ public void setup() throws Exception {
+ networkService = new NetworkService(ImmutableSettings.EMPTY);
+ threadPool = new ThreadPool("test");
+ mockPageCacheRecycler = new MockPageCacheRecycler(ImmutableSettings.EMPTY, threadPool);
+ bigArrays = new MockBigArrays(ImmutableSettings.EMPTY, mockPageCacheRecycler, new NoneCircuitBreakerService());
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ if (threadPool != null) {
+ threadPool.shutdownNow();
+ }
+ if (httpServerTransport != null) {
+ httpServerTransport.close();
+ }
+ }
+
+ @Test
+ @TestLogging("_root:DEBUG")
+ public void testThatHttpPipeliningWorksWhenEnabled() throws Exception {
+ Settings settings = settingsBuilder().put("http.pipelining", true).build();
+ httpServerTransport = new CustomNettyHttpServerTransport(settings);
+ httpServerTransport.start();
+ InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
+
+ List requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast");
+ long maxdurationInMilliSeconds = 1200;
+ try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
+ Collection responses = nettyHttpClient.sendRequests(transportAddress.address(), maxdurationInMilliSeconds, requests.toArray(new String[]{}));
+ Collection responseBodies = returnHttpResponseBodies(responses);
+ assertThat(responseBodies, contains("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast"));
+ }
+ }
+
+ @Test
+ @TestLogging("_root:TRACE")
+ public void testThatHttpPipeliningCanBeDisabled() throws Exception {
+ Settings settings = settingsBuilder().put("http.pipelining", false).build();
+ httpServerTransport = new CustomNettyHttpServerTransport(settings);
+ httpServerTransport.start();
+ InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
+
+ List requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500");
+ long maxdurationInMilliSeconds = 1200;
+ try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
+ Collection responses = nettyHttpClient.sendRequests(transportAddress.address(), maxdurationInMilliSeconds, requests.toArray(new String[]{}));
+ List responseBodies = Lists.newArrayList(returnHttpResponseBodies(responses));
+ // we cannot be sure about the order of the fast requests, but the slow ones should have to be last
+ assertThat(responseBodies, hasSize(5));
+ assertThat(responseBodies.get(3), is("/slow?sleep=500"));
+ assertThat(responseBodies.get(4), is("/slow?sleep=1000"));
+ }
+ }
+
+ class CustomNettyHttpServerTransport extends NettyHttpServerTransport {
+
+ private final ExecutorService executorService;
+
+ public CustomNettyHttpServerTransport(Settings settings) {
+ super(settings, NettyHttpServerPipeliningTest.this.networkService, NettyHttpServerPipeliningTest.this.bigArrays);
+ this.executorService = Executors.newFixedThreadPool(5);
+ }
+
+ @Override
+ public ChannelPipelineFactory configureServerChannelPipelineFactory() {
+ return new CustomHttpChannelPipelineFactory(this, executorService);
+ }
+
+ @Override
+ public HttpServerTransport stop() throws ElasticsearchException {
+ executorService.shutdownNow();
+ return super.stop();
+ }
+ }
+
+ private class CustomHttpChannelPipelineFactory extends HttpChannelPipelineFactory {
+
+ private final ExecutorService executorService;
+
+ public CustomHttpChannelPipelineFactory(NettyHttpServerTransport transport, ExecutorService executorService) {
+ super(transport);
+ this.executorService = executorService;
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = super.getPipeline();
+ pipeline.replace("handler", "handler", new PossiblySlowUpstreamHandler(executorService));
+ return pipeline;
+ }
+ }
+
+ class PossiblySlowUpstreamHandler extends SimpleChannelUpstreamHandler {
+
+ private final ExecutorService executorService;
+
+ public PossiblySlowUpstreamHandler(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
+ executorService.submit(new PossiblySlowRunnable(ctx, e));
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ e.getCause().printStackTrace();
+ e.getChannel().close();
+ }
+ }
+
+ class PossiblySlowRunnable implements Runnable {
+
+ private ChannelHandlerContext ctx;
+ private MessageEvent e;
+
+ public PossiblySlowRunnable(ChannelHandlerContext ctx, MessageEvent e) {
+ this.ctx = ctx;
+ this.e = e;
+ }
+
+ @Override
+ public void run() {
+ HttpRequest request;
+ OrderedUpstreamMessageEvent oue = null;
+ if (e instanceof OrderedUpstreamMessageEvent) {
+ oue = (OrderedUpstreamMessageEvent) e;
+ request = (HttpRequest) oue.getMessage();
+ } else {
+ request = (HttpRequest) e.getMessage();
+ }
+
+ ChannelBuffer buffer = ChannelBuffers.copiedBuffer(request.getUri(), Charsets.UTF_8);
+
+ DefaultHttpResponse httpResponse = new DefaultHttpResponse(HTTP_1_1, OK);
+ httpResponse.headers().add(CONTENT_LENGTH, buffer.readableBytes());
+ httpResponse.setContent(buffer);
+
+ QueryStringDecoder decoder = new QueryStringDecoder(request.getUri());
+
+ final int timeout = request.getUri().startsWith("/slow") && decoder.getParameters().containsKey("sleep") ? Integer.valueOf(decoder.getParameters().get("sleep").get(0)) : 0;
+ if (timeout > 0) {
+ sleep(timeout);
+ }
+
+ if (oue != null) {
+ ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, 0, true, httpResponse));
+ } else {
+ ctx.getChannel().write(httpResponse);
+ }
+ }
+ }
+}
diff --git a/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIntegrationTest.java b/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIntegrationTest.java
new file mode 100644
index 00000000000..7372b47d57b
--- /dev/null
+++ b/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIntegrationTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.http.netty;
+
+import com.google.common.collect.Lists;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.http.HttpServerTransport;
+import org.elasticsearch.node.internal.InternalNode;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.elasticsearch.http.netty.NettyHttpClient.returnOpaqueIds;
+import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
+import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
+import static org.hamcrest.Matchers.*;
+
+/**
+ *
+ */
+@ClusterScope(scope = Scope.TEST, numDataNodes = 1)
+public class NettyPipeliningDisabledIntegrationTest extends ElasticsearchIntegrationTest {
+
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal) {
+ return settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put(InternalNode.HTTP_ENABLED, true).put("http.pipelining", false).build();
+ }
+
+ @Test
+ public void testThatNettyHttpServerDoesNotSupportPipelining() throws Exception {
+ ensureGreen();
+ List requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/");
+
+ HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
+ InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
+
+ try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
+ Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));
+ assertThat(responses, hasSize(requests.size()));
+
+ List opaqueIds = Lists.newArrayList(returnOpaqueIds(responses));
+
+ assertResponsesOutOfOrder(opaqueIds);
+ }
+ }
+
+ /**
+ * checks if all responses are there, but also tests that they are out of order because pipelining is disabled
+ */
+ private void assertResponsesOutOfOrder(List opaqueIds) {
+ String message = String.format(Locale.ROOT, "Expected returned http message ids to be out of order: %s", opaqueIds);
+ assertThat(opaqueIds, hasItems("0", "1", "2", "3", "4", "5", "6"));
+ assertThat(message, opaqueIds, not(contains("0", "1", "2", "3", "4", "5", "6")));
+ }
+}
diff --git a/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIntegrationTest.java b/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIntegrationTest.java
new file mode 100644
index 00000000000..e68a57ca94d
--- /dev/null
+++ b/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIntegrationTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.http.netty;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.http.HttpServerTransport;
+import org.elasticsearch.node.internal.InternalNode;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+
+import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
+import static org.elasticsearch.http.netty.NettyHttpClient.returnOpaqueIds;
+import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
+import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+
+
+@ClusterScope(scope = Scope.TEST, numDataNodes = 1)
+public class NettyPipeliningEnabledIntegrationTest extends ElasticsearchIntegrationTest {
+
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal) {
+ return settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put(InternalNode.HTTP_ENABLED, true).put("http.pipelining", true).build();
+ }
+
+ @Test
+ public void testThatNettyHttpServerSupportsPipelining() throws Exception {
+ List requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/");
+
+ HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
+ InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
+
+ try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
+ Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));
+ assertThat(responses, hasSize(5));
+
+ Collection opaqueIds = returnOpaqueIds(responses);
+ assertOpaqueIdsInOrder(opaqueIds);
+ }
+ }
+
+ private void assertOpaqueIdsInOrder(Collection opaqueIds) {
+ // check if opaque ids are monotonically increasing
+ int i = 0;
+ String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be monotonically increasing, got [" + opaqueIds + "]");
+ for (String opaqueId : opaqueIds) {
+ assertThat(msg, opaqueId, is(String.valueOf(i++)));
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java b/src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java
new file mode 100644
index 00000000000..110a2d73160
--- /dev/null
+++ b/src/test/java/org/elasticsearch/http/netty/pipelining/HttpPipeliningHandlerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.http.netty.pipelining;
+
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.*;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.jboss.netty.buffer.ChannelBuffers.EMPTY_BUFFER;
+import static org.jboss.netty.buffer.ChannelBuffers.copiedBuffer;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.CHUNKED;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.KEEP_ALIVE;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.jboss.netty.util.CharsetUtil.UTF_8;
+
+/**
+ *
+ */
+public class HttpPipeliningHandlerTest extends ElasticsearchTestCase {
+
+ private static final long RESPONSE_TIMEOUT = 10000L;
+ private static final long CONNECTION_TIMEOUT = 10000L;
+ private static final String CONTENT_TYPE_TEXT = "text/plain; charset=UTF-8";
+ // TODO make me random
+ private static final InetSocketAddress HOST_ADDR = new InetSocketAddress("127.0.0.1", 9080);
+ private static final String PATH1 = "/1";
+ private static final String PATH2 = "/2";
+ private static final String SOME_RESPONSE_TEXT = "some response for ";
+
+ private ClientBootstrap clientBootstrap;
+ private ServerBootstrap serverBootstrap;
+
+ private CountDownLatch responsesIn;
+ private final List responses = new ArrayList<>(2);
+
+ private HashedWheelTimer timer;
+
+ @Before
+ public void startBootstraps() {
+ clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());
+
+ clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ return Channels.pipeline(
+ new HttpClientCodec(),
+ new ClientHandler()
+ );
+ }
+ });
+
+ serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory());
+
+ serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ return Channels.pipeline(
+ new HttpRequestDecoder(),
+ new HttpResponseEncoder(),
+ new HttpPipeliningHandler(10000),
+ new ServerHandler()
+ );
+ }
+ });
+
+ serverBootstrap.bind(HOST_ADDR);
+
+ timer = new HashedWheelTimer();
+ }
+
+ @After
+ public void releaseResources() {
+ timer.stop();
+
+ serverBootstrap.shutdown();
+ serverBootstrap.releaseExternalResources();
+ clientBootstrap.shutdown();
+ clientBootstrap.releaseExternalResources();
+ }
+
+ @Test
+ public void shouldReturnMessagesInOrder() throws InterruptedException {
+ responsesIn = new CountDownLatch(1);
+ responses.clear();
+
+ final ChannelFuture connectionFuture = clientBootstrap.connect(HOST_ADDR);
+
+ assertTrue(connectionFuture.await(CONNECTION_TIMEOUT));
+ final Channel clientChannel = connectionFuture.getChannel();
+
+ final HttpRequest request1 = new DefaultHttpRequest(
+ HTTP_1_1, HttpMethod.GET, PATH1);
+ request1.headers().add(HOST, HOST_ADDR.toString());
+
+ final HttpRequest request2 = new DefaultHttpRequest(
+ HTTP_1_1, HttpMethod.GET, PATH2);
+ request2.headers().add(HOST, HOST_ADDR.toString());
+
+ clientChannel.write(request1);
+ clientChannel.write(request2);
+
+ responsesIn.await(RESPONSE_TIMEOUT, MILLISECONDS);
+
+ assertTrue(responses.contains(SOME_RESPONSE_TEXT + PATH1));
+ assertTrue(responses.contains(SOME_RESPONSE_TEXT + PATH2));
+ }
+
+ public class ClientHandler extends SimpleChannelUpstreamHandler {
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) {
+ final Object message = e.getMessage();
+ if (message instanceof HttpChunk) {
+ final HttpChunk response = (HttpChunk) e.getMessage();
+ if (!response.isLast()) {
+ final String content = response.getContent().toString(UTF_8);
+ responses.add(content);
+ if (content.equals(SOME_RESPONSE_TEXT + PATH2)) {
+ responsesIn.countDown();
+ }
+ }
+ }
+ }
+ }
+
+ public class ServerHandler extends SimpleChannelUpstreamHandler {
+ private final AtomicBoolean sendFinalChunk = new AtomicBoolean(false);
+
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws InterruptedException {
+ final HttpRequest request = (HttpRequest) e.getMessage();
+
+ final OrderedUpstreamMessageEvent oue = (OrderedUpstreamMessageEvent) e;
+ final String uri = request.getUri();
+
+ final HttpResponse initialChunk = new DefaultHttpResponse(HTTP_1_1, OK);
+ initialChunk.headers().add(CONTENT_TYPE, CONTENT_TYPE_TEXT);
+ initialChunk.headers().add(CONNECTION, KEEP_ALIVE);
+ initialChunk.headers().add(TRANSFER_ENCODING, CHUNKED);
+
+ ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, 0, false, initialChunk));
+
+ timer.newTimeout(new ChunkWriter(ctx, e, uri, oue, 1), 0, MILLISECONDS);
+ }
+
+ private class ChunkWriter implements TimerTask {
+ private final ChannelHandlerContext ctx;
+ private final MessageEvent e;
+ private final String uri;
+ private final OrderedUpstreamMessageEvent oue;
+ private final int subSequence;
+
+ public ChunkWriter(final ChannelHandlerContext ctx, final MessageEvent e, final String uri,
+ final OrderedUpstreamMessageEvent oue, final int subSequence) {
+ this.ctx = ctx;
+ this.e = e;
+ this.uri = uri;
+ this.oue = oue;
+ this.subSequence = subSequence;
+ }
+
+ @Override
+ public void run(final Timeout timeout) {
+ if (sendFinalChunk.get() && subSequence > 1) {
+ final HttpChunk finalChunk = new DefaultHttpChunk(EMPTY_BUFFER);
+ ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, subSequence, true, finalChunk));
+ } else {
+ final HttpChunk chunk = new DefaultHttpChunk(copiedBuffer(SOME_RESPONSE_TEXT + uri, UTF_8));
+ ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, subSequence, false, chunk));
+
+ timer.newTimeout(new ChunkWriter(ctx, e, uri, oue, subSequence + 1), 0, MILLISECONDS);
+
+ if (uri.equals(PATH2)) {
+ sendFinalChunk.set(true);
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
index ba0abedc88b..7a1b072d480 100644
--- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
+++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java
@@ -1594,6 +1594,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException {
int numClientNodes = InternalTestCluster.DEFAULT_NUM_CLIENT_NODES;
boolean enableRandomBenchNodes = InternalTestCluster.DEFAULT_ENABLE_RANDOM_BENCH_NODES;
+ boolean enableHttpPipelining = InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING;
int minNumDataNodes = InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES;
int maxNumDataNodes = InternalTestCluster.DEFAULT_MAX_NUM_DATA_NODES;
SettingsSource settingsSource = InternalTestCluster.DEFAULT_SETTINGS_SOURCE;
@@ -1660,7 +1661,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
}
return new InternalTestCluster(seed, minNumDataNodes, maxNumDataNodes,
clusterName(scope.name(), Integer.toString(CHILD_JVM_ID), seed), settingsSource, numClientNodes,
- enableRandomBenchNodes, CHILD_JVM_ID, nodePrefix);
+ enableRandomBenchNodes, enableHttpPipelining, CHILD_JVM_ID, nodePrefix);
}
/**
diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java
index 11ccd207b93..152454f0779 100644
--- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java
@@ -152,6 +152,7 @@ public final class InternalTestCluster extends TestCluster {
static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1;
static final boolean DEFAULT_ENABLE_RANDOM_BENCH_NODES = true;
+ static final boolean DEFAULT_ENABLE_HTTP_PIPELINING = true;
public static final String NODE_MODE = nodeMode();
@@ -193,13 +194,13 @@ public final class InternalTestCluster extends TestCluster {
private ServiceDisruptionScheme activeDisruptionScheme;
public InternalTestCluster(long clusterSeed, int minNumDataNodes, int maxNumDataNodes, String clusterName, int numClientNodes, boolean enableRandomBenchNodes,
- int jvmOrdinal, String nodePrefix) {
- this(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix);
+ boolean enableHttpPipelining, int jvmOrdinal, String nodePrefix) {
+ this(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix);
}
public InternalTestCluster(long clusterSeed,
int minNumDataNodes, int maxNumDataNodes, String clusterName, SettingsSource settingsSource, int numClientNodes,
- boolean enableRandomBenchNodes,
+ boolean enableRandomBenchNodes, boolean enableHttpPipelining,
int jvmOrdinal, String nodePrefix) {
super(clusterSeed);
this.clusterName = clusterName;
@@ -267,6 +268,7 @@ public final class InternalTestCluster extends TestCluster {
builder.put("config.ignore_system_properties", true);
builder.put("node.mode", NODE_MODE);
builder.put("script.disable_dynamic", false);
+ builder.put("http.pipelining", enableHttpPipelining);
builder.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false);
if (Strings.hasLength(System.getProperty("es.logger.level"))) {
builder.put("logger.level", System.getProperty("es.logger.level"));
diff --git a/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java
index 78bdb00ab5b..203f53a2d64 100644
--- a/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java
+++ b/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java
@@ -50,11 +50,12 @@ public class InternalTestClusterTests extends ElasticsearchTestCase {
SettingsSource settingsSource = SettingsSource.EMPTY;
int numClientNodes = randomIntBetween(0, 10);
boolean enableRandomBenchNodes = randomBoolean();
+ boolean enableHttpPipelining = randomBoolean();
int jvmOrdinal = randomIntBetween(0, 10);
String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10);
- InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix);
- InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix);
+ InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix);
+ InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix);
assertClusters(cluster0, cluster1, true);
}
@@ -94,11 +95,12 @@ public class InternalTestClusterTests extends ElasticsearchTestCase {
SettingsSource settingsSource = SettingsSource.EMPTY;
int numClientNodes = randomIntBetween(0, 2);
boolean enableRandomBenchNodes = randomBoolean();
+ boolean enableHttpPipelining = randomBoolean();
int jvmOrdinal = randomIntBetween(0, 10);
String nodePrefix = "foobar";
- InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix);
- InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName1, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix);
+ InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix);
+ InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName1, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix);
assertClusters(cluster0, cluster1, false);
long seed = randomLong();
diff --git a/src/test/java/org/elasticsearch/test/transport/NettyTransportMultiPortIntegrationTests.java b/src/test/java/org/elasticsearch/test/transport/NettyTransportMultiPortIntegrationTests.java
index 40983cddcb2..c8baa80e3fa 100644
--- a/src/test/java/org/elasticsearch/test/transport/NettyTransportMultiPortIntegrationTests.java
+++ b/src/test/java/org/elasticsearch/test/transport/NettyTransportMultiPortIntegrationTests.java
@@ -18,7 +18,6 @@
*/
package org.elasticsearch.test.transport;
-import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.transport.TransportClient;
diff --git a/src/test/java/org/elasticsearch/tribe/TribeTests.java b/src/test/java/org/elasticsearch/tribe/TribeTests.java
index e4bf9e05c60..c30dd92b6b9 100644
--- a/src/test/java/org/elasticsearch/tribe/TribeTests.java
+++ b/src/test/java/org/elasticsearch/tribe/TribeTests.java
@@ -64,7 +64,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
public static void setupSecondCluster() throws Exception {
ElasticsearchIntegrationTest.beforeClass();
// create another cluster
- cluster2 = new InternalTestCluster(randomLong(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, CHILD_JVM_ID, SECOND_CLUSTER_NODE_PREFIX);
+ cluster2 = new InternalTestCluster(randomLong(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, false, CHILD_JVM_ID, SECOND_CLUSTER_NODE_PREFIX);
cluster2.beforeTest(getRandom(), 0.1);
cluster2.ensureAtLeastNumDataNodes(2);
}