Netty: Add HTTP pipelining support
This adds HTTP pipelining support to netty. Previously pipelining was not supported due to the asynchronous nature of elasticsearch. The first request that was returned by Elasticsearch, was returned as first response, regardless of the correct order. The solution to this problem is to add a handler to the netty pipeline that maintains an ordered list and thus orders the responses before returning them to the client. This means, we will always have some state on the server side and also requires some memory in order to keep the responses there. Pipelining is enabled by default, but can be configured by setting the http.pipelining property to true|false. In addition the maximum size of the event queue can be configured. The initial netty handler is copied from this repo https://github.com/typesafehub/netty-http-pipelining Closes #2665
This commit is contained in:
parent
e56d85439c
commit
5eeac2fdf6
|
@ -15,8 +15,6 @@ when connecting for better performance and try to get your favorite
|
|||
client not to do
|
||||
http://en.wikipedia.org/wiki/Chunked_transfer_encoding[HTTP chunking].
|
||||
|
||||
IMPORTANT: HTTP pipelining is not supported and should be disabled in your HTTP client.
|
||||
|
||||
[float]
|
||||
=== Settings
|
||||
|
||||
|
@ -69,6 +67,9 @@ be cached for. Defaults to `1728000` (20 days)
|
|||
header should be returned. Note: This header is only returned, when the setting is
|
||||
set to `true`. Defaults to `false`
|
||||
|
||||
|`http.pipelining` |Enable or disable HTTP pipelining, defaults to `true`.
|
||||
|
||||
|`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`.
|
||||
|
||||
|=======================================================================
|
||||
|
||||
|
|
7
pom.xml
7
pom.xml
|
@ -1406,17 +1406,18 @@
|
|||
<include>src/test/java/org/elasticsearch/**/*.java</include>
|
||||
</includes>
|
||||
<excludes>
|
||||
<exclude>src/main/java/org/elasticsearch/common/inject/**</exclude>
|
||||
<!-- Guice -->
|
||||
<exclude>src/main/java/org/elasticsearch/common/inject/**</exclude>
|
||||
<exclude>src/main/java/org/elasticsearch/common/geo/GeoHashUtils.java</exclude>
|
||||
<exclude>src/main/java/org/elasticsearch/common/lucene/search/XBooleanFilter.java</exclude>
|
||||
<exclude>src/main/java/org/elasticsearch/common/lucene/search/XFilteredQuery.java</exclude>
|
||||
<exclude>src/main/java/org/apache/lucene/queryparser/XSimpleQueryParser.java</exclude>
|
||||
<exclude>src/main/java/org/apache/lucene/**/X*.java</exclude>
|
||||
<!-- t-digest -->
|
||||
<exclude>src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java
|
||||
</exclude>
|
||||
<exclude>src/main/java/org/elasticsearch/search/aggregations/metrics/percentiles/tdigest/TDigestState.java</exclude>
|
||||
<exclude>src/test/java/org/elasticsearch/search/aggregations/metrics/GroupTree.java</exclude>
|
||||
<!-- netty pipelining -->
|
||||
<exclude>src/main/java/org/elasticsearch/http/netty/pipelining/**</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
<executions>
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
|
||||
import org.elasticsearch.rest.support.RestUtils;
|
||||
import org.jboss.netty.channel.*;
|
||||
import org.jboss.netty.handler.codec.http.HttpRequest;
|
||||
|
@ -34,19 +35,33 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
|
|||
|
||||
private final NettyHttpServerTransport serverTransport;
|
||||
private final Pattern corsPattern;
|
||||
private final boolean httpPipeliningEnabled;
|
||||
|
||||
public HttpRequestHandler(NettyHttpServerTransport serverTransport) {
|
||||
this.serverTransport = serverTransport;
|
||||
this.corsPattern = RestUtils.getCorsSettingRegex(serverTransport.settings());
|
||||
this.httpPipeliningEnabled = serverTransport.pipelining;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||
HttpRequest request = (HttpRequest) e.getMessage();
|
||||
HttpRequest request;
|
||||
OrderedUpstreamMessageEvent oue = null;
|
||||
if (this.httpPipeliningEnabled && e instanceof OrderedUpstreamMessageEvent) {
|
||||
oue = (OrderedUpstreamMessageEvent) e;
|
||||
request = (HttpRequest) oue.getMessage();
|
||||
} else {
|
||||
request = (HttpRequest) e.getMessage();
|
||||
}
|
||||
|
||||
// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
|
||||
// when reading, or using a cumalation buffer
|
||||
NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());
|
||||
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, e.getChannel(), httpRequest, corsPattern));
|
||||
if (oue != null) {
|
||||
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, corsPattern, oue));
|
||||
} else {
|
||||
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, corsPattern));
|
||||
}
|
||||
super.messageReceived(ctx, e);
|
||||
}
|
||||
|
||||
|
|
|
@ -28,14 +28,14 @@ import org.elasticsearch.common.lease.Releasable;
|
|||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.http.HttpChannel;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
|
||||
import org.elasticsearch.rest.RestResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.support.RestUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
import org.jboss.netty.channel.*;
|
||||
import org.jboss.netty.handler.codec.http.*;
|
||||
|
||||
import java.util.List;
|
||||
|
@ -61,16 +61,22 @@ public class NettyHttpChannel extends HttpChannel {
|
|||
private final NettyHttpServerTransport transport;
|
||||
private final Channel channel;
|
||||
private final org.jboss.netty.handler.codec.http.HttpRequest nettyRequest;
|
||||
private OrderedUpstreamMessageEvent orderedUpstreamMessageEvent = null;
|
||||
private Pattern corsPattern;
|
||||
|
||||
public NettyHttpChannel(NettyHttpServerTransport transport, Channel channel, NettyHttpRequest request, Pattern corsPattern) {
|
||||
public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, Pattern corsPattern) {
|
||||
super(request);
|
||||
this.transport = transport;
|
||||
this.channel = channel;
|
||||
this.channel = request.getChannel();
|
||||
this.nettyRequest = request.request();
|
||||
this.corsPattern = corsPattern;
|
||||
}
|
||||
|
||||
public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, Pattern corsPattern, OrderedUpstreamMessageEvent orderedUpstreamMessageEvent) {
|
||||
this(transport, request, corsPattern);
|
||||
this.orderedUpstreamMessageEvent = orderedUpstreamMessageEvent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BytesStreamOutput newBytesOutput() {
|
||||
return new ReleasableBytesStreamOutput(transport.bigArrays);
|
||||
|
@ -185,14 +191,25 @@ public class NettyHttpChannel extends HttpChannel {
|
|||
}
|
||||
}
|
||||
|
||||
ChannelFuture future = channel.write(resp);
|
||||
ChannelFuture future;
|
||||
|
||||
if (orderedUpstreamMessageEvent != null) {
|
||||
OrderedDownstreamChannelEvent downstreamChannelEvent = new OrderedDownstreamChannelEvent(orderedUpstreamMessageEvent, 0, true, resp);
|
||||
future = downstreamChannelEvent.getFuture();
|
||||
channel.getPipeline().sendDownstream(downstreamChannelEvent);
|
||||
} else {
|
||||
future = channel.write(resp);
|
||||
}
|
||||
|
||||
if (response.contentThreadSafe() && content instanceof Releasable) {
|
||||
future.addListener(new ReleaseChannelFutureListener((Releasable) content));
|
||||
addedReleaseListener = true;
|
||||
}
|
||||
|
||||
if (close) {
|
||||
future.addListener(ChannelFutureListener.CLOSE);
|
||||
}
|
||||
|
||||
} finally {
|
||||
if (!addedReleaseListener && content instanceof Releasable) {
|
||||
((Releasable) content).close();
|
||||
|
|
|
@ -145,6 +145,10 @@ public class NettyHttpRequest extends HttpRequest {
|
|||
return channel.getLocalAddress();
|
||||
}
|
||||
|
||||
public Channel getChannel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String header(String name) {
|
||||
return request.headers().get(name);
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.http.*;
|
||||
import org.elasticsearch.http.netty.pipelining.HttpPipeliningHandler;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap;
|
||||
|
@ -72,6 +73,13 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
public static final String SETTING_CORS_ALLOW_METHODS = "http.cors.allow-methods";
|
||||
public static final String SETTING_CORS_ALLOW_HEADERS = "http.cors.allow-headers";
|
||||
public static final String SETTING_CORS_ALLOW_CREDENTIALS = "http.cors.allow-credentials";
|
||||
public static final String SETTING_PIPELINING = "http.pipelining";
|
||||
public static final String SETTING_PIPELINING_MAX_EVENTS = "http.pipelining.max_events";
|
||||
public static final String SETTING_HTTP_COMPRESSION = "http.compression";
|
||||
public static final String SETTING_HTTP_COMPRESSION_LEVEL = "http.compression_level";
|
||||
|
||||
public static final boolean DEFAULT_SETTING_PIPELINING = true;
|
||||
public static final int DEFAULT_SETTING_PIPELINING_MAX_EVENTS = 10000;
|
||||
|
||||
private final NetworkService networkService;
|
||||
final BigArrays bigArrays;
|
||||
|
@ -85,6 +93,10 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
|
||||
private final boolean blockingServer;
|
||||
|
||||
final boolean pipelining;
|
||||
|
||||
private final int pipeliningMaxEvents;
|
||||
|
||||
final boolean compression;
|
||||
|
||||
private final int compressionLevel;
|
||||
|
@ -164,8 +176,10 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
|
||||
}
|
||||
|
||||
this.compression = settings.getAsBoolean("http.compression", false);
|
||||
this.compressionLevel = settings.getAsInt("http.compression_level", 6);
|
||||
this.compression = settings.getAsBoolean(SETTING_HTTP_COMPRESSION, false);
|
||||
this.compressionLevel = settings.getAsInt(SETTING_HTTP_COMPRESSION_LEVEL, 6);
|
||||
this.pipelining = settings.getAsBoolean(SETTING_PIPELINING, DEFAULT_SETTING_PIPELINING);
|
||||
this.pipeliningMaxEvents = settings.getAsInt(SETTING_PIPELINING_MAX_EVENTS, DEFAULT_SETTING_PIPELINING_MAX_EVENTS);
|
||||
|
||||
// validate max content length
|
||||
if (maxContentLength.bytes() > Integer.MAX_VALUE) {
|
||||
|
@ -174,8 +188,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
}
|
||||
this.maxContentLength = maxContentLength;
|
||||
|
||||
logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], receive_predictor[{}->{}]",
|
||||
maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictorMin, receivePredictorMax);
|
||||
logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], receive_predictor[{}->{}], pipelining[{}], pipelining_max_events[{}]",
|
||||
maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictorMin, receivePredictorMax, pipelining, pipeliningMaxEvents);
|
||||
}
|
||||
|
||||
public Settings settings() {
|
||||
|
@ -370,6 +384,9 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
if (transport.compression) {
|
||||
pipeline.addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel));
|
||||
}
|
||||
if (transport.pipelining) {
|
||||
pipeline.addLast("pipelining", new HttpPipeliningHandler(transport.pipeliningMaxEvents));
|
||||
}
|
||||
pipeline.addLast("handler", requestHandler);
|
||||
return pipeline;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
package org.elasticsearch.http.netty.pipelining;
|
||||
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
import org.jboss.netty.channel.*;
|
||||
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
|
||||
import org.jboss.netty.handler.codec.http.HttpRequest;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their
|
||||
* corresponding requests. NOTE: A side effect of using this handler is that upstream HttpRequest objects will
|
||||
* cause the original message event to be effectively transformed into an OrderedUpstreamMessageEvent. Conversely
|
||||
* OrderedDownstreamChannelEvent objects are expected to be received for the correlating response objects.
|
||||
*
|
||||
* @author Christopher Hunt
|
||||
*/
|
||||
public class HttpPipeliningHandler extends SimpleChannelHandler {
|
||||
|
||||
public static final int INITIAL_EVENTS_HELD = 3;
|
||||
|
||||
private final int maxEventsHeld;
|
||||
|
||||
private int sequence;
|
||||
private int nextRequiredSequence;
|
||||
private int nextRequiredSubsequence;
|
||||
|
||||
private final Queue<OrderedDownstreamChannelEvent> holdingQueue;
|
||||
|
||||
/**
|
||||
* @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel
|
||||
* connection. This is required as events cannot queue up indefintely; we would run out of
|
||||
* memory if this was the case.
|
||||
*/
|
||||
public HttpPipeliningHandler(final int maxEventsHeld) {
|
||||
this.maxEventsHeld = maxEventsHeld;
|
||||
|
||||
holdingQueue = new PriorityQueue<>(INITIAL_EVENTS_HELD, new Comparator<OrderedDownstreamChannelEvent>() {
|
||||
@Override
|
||||
public int compare(OrderedDownstreamChannelEvent o1, OrderedDownstreamChannelEvent o2) {
|
||||
final int delta = o1.getOrderedUpstreamMessageEvent().getSequence() - o2.getOrderedUpstreamMessageEvent().getSequence();
|
||||
if (delta == 0) {
|
||||
return o1.getSubsequence() - o2.getSubsequence();
|
||||
} else {
|
||||
return delta;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public int getMaxEventsHeld() {
|
||||
return maxEventsHeld;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) {
|
||||
final Object msg = e.getMessage();
|
||||
if (msg instanceof HttpRequest) {
|
||||
ctx.sendUpstream(new OrderedUpstreamMessageEvent(sequence++, e.getChannel(), msg, e.getRemoteAddress()));
|
||||
} else {
|
||||
ctx.sendUpstream(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
|
||||
throws Exception {
|
||||
if (e instanceof OrderedDownstreamChannelEvent) {
|
||||
|
||||
boolean channelShouldClose = false;
|
||||
|
||||
synchronized (holdingQueue) {
|
||||
if (holdingQueue.size() < maxEventsHeld) {
|
||||
|
||||
final OrderedDownstreamChannelEvent currentEvent = (OrderedDownstreamChannelEvent) e;
|
||||
holdingQueue.add(currentEvent);
|
||||
|
||||
while (!holdingQueue.isEmpty()) {
|
||||
final OrderedDownstreamChannelEvent nextEvent = holdingQueue.peek();
|
||||
|
||||
if (nextEvent.getOrderedUpstreamMessageEvent().getSequence() != nextRequiredSequence |
|
||||
nextEvent.getSubsequence() != nextRequiredSubsequence) {
|
||||
break;
|
||||
}
|
||||
holdingQueue.remove();
|
||||
ctx.sendDownstream(nextEvent.getChannelEvent());
|
||||
if (nextEvent.isLast()) {
|
||||
++nextRequiredSequence;
|
||||
nextRequiredSubsequence = 0;
|
||||
} else {
|
||||
++nextRequiredSubsequence;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
channelShouldClose = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (channelShouldClose) {
|
||||
Channels.close(e.getChannel());
|
||||
}
|
||||
} else {
|
||||
super.handleDownstream(ctx, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
package org.elasticsearch.http.netty.pipelining;
|
||||
|
||||
import org.jboss.netty.channel.*;
|
||||
|
||||
/**
|
||||
* Permits downstream channel events to be ordered and signalled as to whether more are to come for a given sequence.
|
||||
*
|
||||
* @author Christopher Hunt
|
||||
*/
|
||||
public class OrderedDownstreamChannelEvent implements ChannelEvent {
|
||||
|
||||
final ChannelEvent ce;
|
||||
final OrderedUpstreamMessageEvent oue;
|
||||
final int subsequence;
|
||||
final boolean last;
|
||||
|
||||
/**
|
||||
* Construct a downstream channel event for all types of events.
|
||||
*
|
||||
* @param oue the OrderedUpstreamMessageEvent that this response is associated with
|
||||
* @param subsequence the sequence within the sequence
|
||||
* @param last when set to true this indicates that there are no more responses to be received for the
|
||||
* original OrderedUpstreamMessageEvent
|
||||
*/
|
||||
public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oue, final int subsequence, boolean last,
|
||||
final ChannelEvent ce) {
|
||||
this.oue = oue;
|
||||
this.ce = ce;
|
||||
this.subsequence = subsequence;
|
||||
this.last = last;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience constructor signifying that this downstream message event is the last one for the given sequence,
|
||||
* and that there is only one response.
|
||||
*/
|
||||
public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oe,
|
||||
final Object message) {
|
||||
this(oe, 0, true, message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience constructor for passing message events.
|
||||
*/
|
||||
public OrderedDownstreamChannelEvent(final OrderedUpstreamMessageEvent oue, final int subsequence, boolean last,
|
||||
final Object message) {
|
||||
this(oue, subsequence, last, new DownstreamMessageEvent(oue.getChannel(), Channels.future(oue.getChannel()),
|
||||
message, oue.getRemoteAddress()));
|
||||
|
||||
}
|
||||
|
||||
public OrderedUpstreamMessageEvent getOrderedUpstreamMessageEvent() {
|
||||
return oue;
|
||||
}
|
||||
|
||||
public int getSubsequence() {
|
||||
return subsequence;
|
||||
}
|
||||
|
||||
public boolean isLast() {
|
||||
return last;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Channel getChannel() {
|
||||
return ce.getChannel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture getFuture() {
|
||||
return ce.getFuture();
|
||||
}
|
||||
|
||||
public ChannelEvent getChannelEvent() {
|
||||
return ce;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
package org.elasticsearch.http.netty.pipelining;
|
||||
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.UpstreamMessageEvent;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
/**
|
||||
* Permits upstream message events to be ordered.
|
||||
*
|
||||
* @author Christopher Hunt
|
||||
*/
|
||||
public class OrderedUpstreamMessageEvent extends UpstreamMessageEvent {
|
||||
final int sequence;
|
||||
|
||||
public OrderedUpstreamMessageEvent(final int sequence, final Channel channel, final Object msg, final SocketAddress remoteAddress) {
|
||||
super(channel, msg, remoteAddress);
|
||||
this.sequence = sequence;
|
||||
}
|
||||
|
||||
public int getSequence() {
|
||||
return sequence;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Collections2;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.jboss.netty.bootstrap.ClientBootstrap;
|
||||
import org.jboss.netty.channel.*;
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
||||
import org.jboss.netty.handler.codec.http.*;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.HOST;
|
||||
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||
|
||||
/**
|
||||
* Tiny helper
|
||||
*/
|
||||
public class NettyHttpClient implements Closeable {
|
||||
|
||||
private static final Function<? super HttpResponse, String> FUNCTION_RESPONSE_TO_CONTENT = new Function<HttpResponse, String>() {
|
||||
@Override
|
||||
public String apply(HttpResponse response) {
|
||||
return response.getContent().toString(Charsets.UTF_8);
|
||||
}
|
||||
};
|
||||
|
||||
private static final Function<? super HttpResponse, String> FUNCTION_RESPONSE_OPAQUE_ID = new Function<HttpResponse, String>() {
|
||||
@Override
|
||||
public String apply(HttpResponse response) {
|
||||
return response.headers().get("X-Opaque-Id");
|
||||
}
|
||||
};
|
||||
|
||||
public static Collection<String> returnHttpResponseBodies(Collection<HttpResponse> responses) {
|
||||
return Collections2.transform(responses, FUNCTION_RESPONSE_TO_CONTENT);
|
||||
}
|
||||
|
||||
public static Collection<String> returnOpaqueIds(Collection<HttpResponse> responses) {
|
||||
return Collections2.transform(responses, FUNCTION_RESPONSE_OPAQUE_ID);
|
||||
}
|
||||
|
||||
private final ClientBootstrap clientBootstrap;
|
||||
|
||||
public NettyHttpClient() {
|
||||
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());;
|
||||
}
|
||||
|
||||
public Collection<HttpResponse> sendRequests(SocketAddress remoteAddress, String... uris) throws InterruptedException {
|
||||
return sendRequests(remoteAddress, -1, uris);
|
||||
}
|
||||
|
||||
public synchronized Collection<HttpResponse> sendRequests(SocketAddress remoteAddress, long expectedMaxDuration, String... uris) throws InterruptedException {
|
||||
final CountDownLatch latch = new CountDownLatch(uris.length);
|
||||
final Collection<HttpResponse> content = Collections.synchronizedList(new ArrayList<HttpResponse>(uris.length));
|
||||
|
||||
clientBootstrap.setPipelineFactory(new CountDownLatchPipelineFactory(latch, content));
|
||||
|
||||
ChannelFuture channelFuture = null;
|
||||
try {
|
||||
channelFuture = clientBootstrap.connect(remoteAddress);
|
||||
channelFuture.await(1000);
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
for (int i = 0; i < uris.length; i++) {
|
||||
final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
|
||||
httpRequest.headers().add(HOST, "localhost");
|
||||
httpRequest.headers().add("X-Opaque-ID", String.valueOf(i));
|
||||
channelFuture.getChannel().write(httpRequest);
|
||||
}
|
||||
latch.await();
|
||||
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
// make sure the request were executed in parallel
|
||||
if (expectedMaxDuration > 0) {
|
||||
assertThat(duration, is(lessThan(expectedMaxDuration)));
|
||||
}
|
||||
} finally {
|
||||
if (channelFuture != null) {
|
||||
channelFuture.getChannel().close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return content;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
clientBootstrap.shutdown();
|
||||
clientBootstrap.releaseExternalResources();
|
||||
}
|
||||
|
||||
/**
|
||||
* helper factory which adds returned data to a list and uses a count down latch to decide when done
|
||||
*/
|
||||
public static class CountDownLatchPipelineFactory implements ChannelPipelineFactory {
|
||||
private final CountDownLatch latch;
|
||||
private final Collection<HttpResponse> content;
|
||||
|
||||
public CountDownLatchPipelineFactory(CountDownLatch latch, Collection<HttpResponse> content) {
|
||||
this.latch = latch;
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
final int maxBytes = new ByteSizeValue(100, ByteSizeUnit.MB).bytesAsInt();
|
||||
return Channels.pipeline(
|
||||
new HttpClientCodec(),
|
||||
new HttpChunkAggregator(maxBytes),
|
||||
new SimpleChannelUpstreamHandler() {
|
||||
@Override
|
||||
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) {
|
||||
final Object message = e.getMessage();
|
||||
|
||||
if (message instanceof HttpResponse) {
|
||||
HttpResponse response = (HttpResponse) message;
|
||||
content.add(response);
|
||||
}
|
||||
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
super.exceptionCaught(ctx, e);
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,227 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.elasticsearch.test.cache.recycler.MockBigArrays;
|
||||
import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.channel.*;
|
||||
import org.jboss.netty.handler.codec.http.*;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.http.netty.NettyHttpClient.returnHttpResponseBodies;
|
||||
import static org.elasticsearch.http.netty.NettyHttpServerTransport.HttpChannelPipelineFactory;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
|
||||
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
|
||||
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.CLOSE;
|
||||
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
|
||||
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_0;
|
||||
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||
|
||||
/**
|
||||
* This test just tests, if he pipelining works in general with out any connection the elasticsearch handler
|
||||
*/
|
||||
public class NettyHttpServerPipeliningTest extends ElasticsearchTestCase {
|
||||
|
||||
private NetworkService networkService;
|
||||
private ThreadPool threadPool;
|
||||
private MockPageCacheRecycler mockPageCacheRecycler;
|
||||
private MockBigArrays bigArrays;
|
||||
private CustomNettyHttpServerTransport httpServerTransport;
|
||||
|
||||
@Before
|
||||
public void setup() throws Exception {
|
||||
networkService = new NetworkService(ImmutableSettings.EMPTY);
|
||||
threadPool = new ThreadPool("test");
|
||||
mockPageCacheRecycler = new MockPageCacheRecycler(ImmutableSettings.EMPTY, threadPool);
|
||||
bigArrays = new MockBigArrays(ImmutableSettings.EMPTY, mockPageCacheRecycler, new NoneCircuitBreakerService());
|
||||
}
|
||||
|
||||
@After
|
||||
public void shutdown() throws Exception {
|
||||
if (threadPool != null) {
|
||||
threadPool.shutdownNow();
|
||||
}
|
||||
if (httpServerTransport != null) {
|
||||
httpServerTransport.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@TestLogging("_root:DEBUG")
|
||||
public void testThatHttpPipeliningWorksWhenEnabled() throws Exception {
|
||||
Settings settings = settingsBuilder().put("http.pipelining", true).build();
|
||||
httpServerTransport = new CustomNettyHttpServerTransport(settings);
|
||||
httpServerTransport.start();
|
||||
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
|
||||
|
||||
List<String> requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast");
|
||||
long maxdurationInMilliSeconds = 1200;
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.sendRequests(transportAddress.address(), maxdurationInMilliSeconds, requests.toArray(new String[]{}));
|
||||
Collection<String> responseBodies = returnHttpResponseBodies(responses);
|
||||
assertThat(responseBodies, contains("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@TestLogging("_root:TRACE")
|
||||
public void testThatHttpPipeliningCanBeDisabled() throws Exception {
|
||||
Settings settings = settingsBuilder().put("http.pipelining", false).build();
|
||||
httpServerTransport = new CustomNettyHttpServerTransport(settings);
|
||||
httpServerTransport.start();
|
||||
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
|
||||
|
||||
List<String> requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500");
|
||||
long maxdurationInMilliSeconds = 1200;
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.sendRequests(transportAddress.address(), maxdurationInMilliSeconds, requests.toArray(new String[]{}));
|
||||
List<String> responseBodies = Lists.newArrayList(returnHttpResponseBodies(responses));
|
||||
// we cannot be sure about the order of the fast requests, but the slow ones should have to be last
|
||||
assertThat(responseBodies, hasSize(5));
|
||||
assertThat(responseBodies.get(3), is("/slow?sleep=500"));
|
||||
assertThat(responseBodies.get(4), is("/slow?sleep=1000"));
|
||||
}
|
||||
}
|
||||
|
||||
class CustomNettyHttpServerTransport extends NettyHttpServerTransport {
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public CustomNettyHttpServerTransport(Settings settings) {
|
||||
super(settings, NettyHttpServerPipeliningTest.this.networkService, NettyHttpServerPipeliningTest.this.bigArrays);
|
||||
this.executorService = Executors.newFixedThreadPool(5);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
|
||||
return new CustomHttpChannelPipelineFactory(this, executorService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpServerTransport stop() throws ElasticsearchException {
|
||||
executorService.shutdownNow();
|
||||
return super.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private class CustomHttpChannelPipelineFactory extends HttpChannelPipelineFactory {
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public CustomHttpChannelPipelineFactory(NettyHttpServerTransport transport, ExecutorService executorService) {
|
||||
super(transport);
|
||||
this.executorService = executorService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline pipeline = super.getPipeline();
|
||||
pipeline.replace("handler", "handler", new PossiblySlowUpstreamHandler(executorService));
|
||||
return pipeline;
|
||||
}
|
||||
}
|
||||
|
||||
class PossiblySlowUpstreamHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
public PossiblySlowUpstreamHandler(ExecutorService executorService) {
|
||||
this.executorService = executorService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
|
||||
executorService.submit(new PossiblySlowRunnable(ctx, e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
|
||||
e.getCause().printStackTrace();
|
||||
e.getChannel().close();
|
||||
}
|
||||
}
|
||||
|
||||
class PossiblySlowRunnable implements Runnable {
|
||||
|
||||
private ChannelHandlerContext ctx;
|
||||
private MessageEvent e;
|
||||
|
||||
public PossiblySlowRunnable(ChannelHandlerContext ctx, MessageEvent e) {
|
||||
this.ctx = ctx;
|
||||
this.e = e;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
HttpRequest request;
|
||||
OrderedUpstreamMessageEvent oue = null;
|
||||
if (e instanceof OrderedUpstreamMessageEvent) {
|
||||
oue = (OrderedUpstreamMessageEvent) e;
|
||||
request = (HttpRequest) oue.getMessage();
|
||||
} else {
|
||||
request = (HttpRequest) e.getMessage();
|
||||
}
|
||||
|
||||
ChannelBuffer buffer = ChannelBuffers.copiedBuffer(request.getUri(), Charsets.UTF_8);
|
||||
|
||||
DefaultHttpResponse httpResponse = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||
httpResponse.headers().add(CONTENT_LENGTH, buffer.readableBytes());
|
||||
httpResponse.setContent(buffer);
|
||||
|
||||
QueryStringDecoder decoder = new QueryStringDecoder(request.getUri());
|
||||
|
||||
final int timeout = request.getUri().startsWith("/slow") && decoder.getParameters().containsKey("sleep") ? Integer.valueOf(decoder.getParameters().get("sleep").get(0)) : 0;
|
||||
if (timeout > 0) {
|
||||
sleep(timeout);
|
||||
}
|
||||
|
||||
if (oue != null) {
|
||||
ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, 0, true, httpResponse));
|
||||
} else {
|
||||
ctx.getChannel().write(httpResponse);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.node.internal.InternalNode;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.http.netty.NettyHttpClient.returnOpaqueIds;
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 1)
|
||||
public class NettyPipeliningDisabledIntegrationTest extends ElasticsearchIntegrationTest {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put(InternalNode.HTTP_ENABLED, true).put("http.pipelining", false).build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThatNettyHttpServerDoesNotSupportPipelining() throws Exception {
|
||||
ensureGreen();
|
||||
List<String> requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/");
|
||||
|
||||
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
|
||||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
|
||||
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));
|
||||
assertThat(responses, hasSize(requests.size()));
|
||||
|
||||
List<String> opaqueIds = Lists.newArrayList(returnOpaqueIds(responses));
|
||||
|
||||
assertResponsesOutOfOrder(opaqueIds);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* checks if all responses are there, but also tests that they are out of order because pipelining is disabled
|
||||
*/
|
||||
private void assertResponsesOutOfOrder(List<String> opaqueIds) {
|
||||
String message = String.format(Locale.ROOT, "Expected returned http message ids to be out of order: %s", opaqueIds);
|
||||
assertThat(opaqueIds, hasItems("0", "1", "2", "3", "4", "5", "6"));
|
||||
assertThat(message, opaqueIds, not(contains("0", "1", "2", "3", "4", "5", "6")));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.node.internal.InternalNode;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.http.netty.NettyHttpClient.returnOpaqueIds;
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
|
||||
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
|
||||
@ClusterScope(scope = Scope.TEST, numDataNodes = 1)
|
||||
public class NettyPipeliningEnabledIntegrationTest extends ElasticsearchIntegrationTest {
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return settingsBuilder().put(super.nodeSettings(nodeOrdinal)).put(InternalNode.HTTP_ENABLED, true).put("http.pipelining", true).build();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThatNettyHttpServerSupportsPipelining() throws Exception {
|
||||
List<String> requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/");
|
||||
|
||||
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
|
||||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) httpServerTransport.boundAddress().boundAddress();
|
||||
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));
|
||||
assertThat(responses, hasSize(5));
|
||||
|
||||
Collection<String> opaqueIds = returnOpaqueIds(responses);
|
||||
assertOpaqueIdsInOrder(opaqueIds);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
|
||||
// check if opaque ids are monotonically increasing
|
||||
int i = 0;
|
||||
String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be monotonically increasing, got [" + opaqueIds + "]");
|
||||
for (String opaqueId : opaqueIds) {
|
||||
assertThat(msg, opaqueId, is(String.valueOf(i++)));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,215 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http.netty.pipelining;
|
||||
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.jboss.netty.bootstrap.ClientBootstrap;
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap;
|
||||
import org.jboss.netty.channel.*;
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
|
||||
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
||||
import org.jboss.netty.handler.codec.http.*;
|
||||
import org.jboss.netty.util.HashedWheelTimer;
|
||||
import org.jboss.netty.util.Timeout;
|
||||
import org.jboss.netty.util.TimerTask;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static org.jboss.netty.buffer.ChannelBuffers.EMPTY_BUFFER;
|
||||
import static org.jboss.netty.buffer.ChannelBuffers.copiedBuffer;
|
||||
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
|
||||
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.CHUNKED;
|
||||
import static org.jboss.netty.handler.codec.http.HttpHeaders.Values.KEEP_ALIVE;
|
||||
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
|
||||
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||
import static org.jboss.netty.util.CharsetUtil.UTF_8;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class HttpPipeliningHandlerTest extends ElasticsearchTestCase {
|
||||
|
||||
private static final long RESPONSE_TIMEOUT = 10000L;
|
||||
private static final long CONNECTION_TIMEOUT = 10000L;
|
||||
private static final String CONTENT_TYPE_TEXT = "text/plain; charset=UTF-8";
|
||||
// TODO make me random
|
||||
private static final InetSocketAddress HOST_ADDR = new InetSocketAddress("127.0.0.1", 9080);
|
||||
private static final String PATH1 = "/1";
|
||||
private static final String PATH2 = "/2";
|
||||
private static final String SOME_RESPONSE_TEXT = "some response for ";
|
||||
|
||||
private ClientBootstrap clientBootstrap;
|
||||
private ServerBootstrap serverBootstrap;
|
||||
|
||||
private CountDownLatch responsesIn;
|
||||
private final List<String> responses = new ArrayList<>(2);
|
||||
|
||||
private HashedWheelTimer timer;
|
||||
|
||||
@Before
|
||||
public void startBootstraps() {
|
||||
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());
|
||||
|
||||
clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
return Channels.pipeline(
|
||||
new HttpClientCodec(),
|
||||
new ClientHandler()
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory());
|
||||
|
||||
serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
return Channels.pipeline(
|
||||
new HttpRequestDecoder(),
|
||||
new HttpResponseEncoder(),
|
||||
new HttpPipeliningHandler(10000),
|
||||
new ServerHandler()
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
serverBootstrap.bind(HOST_ADDR);
|
||||
|
||||
timer = new HashedWheelTimer();
|
||||
}
|
||||
|
||||
@After
|
||||
public void releaseResources() {
|
||||
timer.stop();
|
||||
|
||||
serverBootstrap.shutdown();
|
||||
serverBootstrap.releaseExternalResources();
|
||||
clientBootstrap.shutdown();
|
||||
clientBootstrap.releaseExternalResources();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldReturnMessagesInOrder() throws InterruptedException {
|
||||
responsesIn = new CountDownLatch(1);
|
||||
responses.clear();
|
||||
|
||||
final ChannelFuture connectionFuture = clientBootstrap.connect(HOST_ADDR);
|
||||
|
||||
assertTrue(connectionFuture.await(CONNECTION_TIMEOUT));
|
||||
final Channel clientChannel = connectionFuture.getChannel();
|
||||
|
||||
final HttpRequest request1 = new DefaultHttpRequest(
|
||||
HTTP_1_1, HttpMethod.GET, PATH1);
|
||||
request1.headers().add(HOST, HOST_ADDR.toString());
|
||||
|
||||
final HttpRequest request2 = new DefaultHttpRequest(
|
||||
HTTP_1_1, HttpMethod.GET, PATH2);
|
||||
request2.headers().add(HOST, HOST_ADDR.toString());
|
||||
|
||||
clientChannel.write(request1);
|
||||
clientChannel.write(request2);
|
||||
|
||||
responsesIn.await(RESPONSE_TIMEOUT, MILLISECONDS);
|
||||
|
||||
assertTrue(responses.contains(SOME_RESPONSE_TEXT + PATH1));
|
||||
assertTrue(responses.contains(SOME_RESPONSE_TEXT + PATH2));
|
||||
}
|
||||
|
||||
public class ClientHandler extends SimpleChannelUpstreamHandler {
|
||||
@Override
|
||||
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) {
|
||||
final Object message = e.getMessage();
|
||||
if (message instanceof HttpChunk) {
|
||||
final HttpChunk response = (HttpChunk) e.getMessage();
|
||||
if (!response.isLast()) {
|
||||
final String content = response.getContent().toString(UTF_8);
|
||||
responses.add(content);
|
||||
if (content.equals(SOME_RESPONSE_TEXT + PATH2)) {
|
||||
responsesIn.countDown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class ServerHandler extends SimpleChannelUpstreamHandler {
|
||||
private final AtomicBoolean sendFinalChunk = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws InterruptedException {
|
||||
final HttpRequest request = (HttpRequest) e.getMessage();
|
||||
|
||||
final OrderedUpstreamMessageEvent oue = (OrderedUpstreamMessageEvent) e;
|
||||
final String uri = request.getUri();
|
||||
|
||||
final HttpResponse initialChunk = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||
initialChunk.headers().add(CONTENT_TYPE, CONTENT_TYPE_TEXT);
|
||||
initialChunk.headers().add(CONNECTION, KEEP_ALIVE);
|
||||
initialChunk.headers().add(TRANSFER_ENCODING, CHUNKED);
|
||||
|
||||
ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, 0, false, initialChunk));
|
||||
|
||||
timer.newTimeout(new ChunkWriter(ctx, e, uri, oue, 1), 0, MILLISECONDS);
|
||||
}
|
||||
|
||||
private class ChunkWriter implements TimerTask {
|
||||
private final ChannelHandlerContext ctx;
|
||||
private final MessageEvent e;
|
||||
private final String uri;
|
||||
private final OrderedUpstreamMessageEvent oue;
|
||||
private final int subSequence;
|
||||
|
||||
public ChunkWriter(final ChannelHandlerContext ctx, final MessageEvent e, final String uri,
|
||||
final OrderedUpstreamMessageEvent oue, final int subSequence) {
|
||||
this.ctx = ctx;
|
||||
this.e = e;
|
||||
this.uri = uri;
|
||||
this.oue = oue;
|
||||
this.subSequence = subSequence;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(final Timeout timeout) {
|
||||
if (sendFinalChunk.get() && subSequence > 1) {
|
||||
final HttpChunk finalChunk = new DefaultHttpChunk(EMPTY_BUFFER);
|
||||
ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, subSequence, true, finalChunk));
|
||||
} else {
|
||||
final HttpChunk chunk = new DefaultHttpChunk(copiedBuffer(SOME_RESPONSE_TEXT + uri, UTF_8));
|
||||
ctx.sendDownstream(new OrderedDownstreamChannelEvent(oue, subSequence, false, chunk));
|
||||
|
||||
timer.newTimeout(new ChunkWriter(ctx, e, uri, oue, subSequence + 1), 0, MILLISECONDS);
|
||||
|
||||
if (uri.equals(PATH2)) {
|
||||
sendFinalChunk.set(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1594,6 +1594,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
|||
protected TestCluster buildTestCluster(Scope scope, long seed) throws IOException {
|
||||
int numClientNodes = InternalTestCluster.DEFAULT_NUM_CLIENT_NODES;
|
||||
boolean enableRandomBenchNodes = InternalTestCluster.DEFAULT_ENABLE_RANDOM_BENCH_NODES;
|
||||
boolean enableHttpPipelining = InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING;
|
||||
int minNumDataNodes = InternalTestCluster.DEFAULT_MIN_NUM_DATA_NODES;
|
||||
int maxNumDataNodes = InternalTestCluster.DEFAULT_MAX_NUM_DATA_NODES;
|
||||
SettingsSource settingsSource = InternalTestCluster.DEFAULT_SETTINGS_SOURCE;
|
||||
|
@ -1660,7 +1661,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
|||
}
|
||||
return new InternalTestCluster(seed, minNumDataNodes, maxNumDataNodes,
|
||||
clusterName(scope.name(), Integer.toString(CHILD_JVM_ID), seed), settingsSource, numClientNodes,
|
||||
enableRandomBenchNodes, CHILD_JVM_ID, nodePrefix);
|
||||
enableRandomBenchNodes, enableHttpPipelining, CHILD_JVM_ID, nodePrefix);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -152,6 +152,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1;
|
||||
|
||||
static final boolean DEFAULT_ENABLE_RANDOM_BENCH_NODES = true;
|
||||
static final boolean DEFAULT_ENABLE_HTTP_PIPELINING = true;
|
||||
|
||||
public static final String NODE_MODE = nodeMode();
|
||||
|
||||
|
@ -193,13 +194,13 @@ public final class InternalTestCluster extends TestCluster {
|
|||
private ServiceDisruptionScheme activeDisruptionScheme;
|
||||
|
||||
public InternalTestCluster(long clusterSeed, int minNumDataNodes, int maxNumDataNodes, String clusterName, int numClientNodes, boolean enableRandomBenchNodes,
|
||||
int jvmOrdinal, String nodePrefix) {
|
||||
this(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix);
|
||||
boolean enableHttpPipelining, int jvmOrdinal, String nodePrefix) {
|
||||
this(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, DEFAULT_SETTINGS_SOURCE, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix);
|
||||
}
|
||||
|
||||
public InternalTestCluster(long clusterSeed,
|
||||
int minNumDataNodes, int maxNumDataNodes, String clusterName, SettingsSource settingsSource, int numClientNodes,
|
||||
boolean enableRandomBenchNodes,
|
||||
boolean enableRandomBenchNodes, boolean enableHttpPipelining,
|
||||
int jvmOrdinal, String nodePrefix) {
|
||||
super(clusterSeed);
|
||||
this.clusterName = clusterName;
|
||||
|
@ -267,6 +268,7 @@ public final class InternalTestCluster extends TestCluster {
|
|||
builder.put("config.ignore_system_properties", true);
|
||||
builder.put("node.mode", NODE_MODE);
|
||||
builder.put("script.disable_dynamic", false);
|
||||
builder.put("http.pipelining", enableHttpPipelining);
|
||||
builder.put("plugins." + PluginsService.LOAD_PLUGIN_FROM_CLASSPATH, false);
|
||||
if (Strings.hasLength(System.getProperty("es.logger.level"))) {
|
||||
builder.put("logger.level", System.getProperty("es.logger.level"));
|
||||
|
|
|
@ -50,11 +50,12 @@ public class InternalTestClusterTests extends ElasticsearchTestCase {
|
|||
SettingsSource settingsSource = SettingsSource.EMPTY;
|
||||
int numClientNodes = randomIntBetween(0, 10);
|
||||
boolean enableRandomBenchNodes = randomBoolean();
|
||||
boolean enableHttpPipelining = randomBoolean();
|
||||
int jvmOrdinal = randomIntBetween(0, 10);
|
||||
String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10);
|
||||
|
||||
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix);
|
||||
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix);
|
||||
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix);
|
||||
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix);
|
||||
assertClusters(cluster0, cluster1, true);
|
||||
|
||||
}
|
||||
|
@ -94,11 +95,12 @@ public class InternalTestClusterTests extends ElasticsearchTestCase {
|
|||
SettingsSource settingsSource = SettingsSource.EMPTY;
|
||||
int numClientNodes = randomIntBetween(0, 2);
|
||||
boolean enableRandomBenchNodes = randomBoolean();
|
||||
boolean enableHttpPipelining = randomBoolean();
|
||||
int jvmOrdinal = randomIntBetween(0, 10);
|
||||
String nodePrefix = "foobar";
|
||||
|
||||
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix);
|
||||
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName1, settingsSource, numClientNodes, enableRandomBenchNodes, jvmOrdinal, nodePrefix);
|
||||
InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix);
|
||||
InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, minNumDataNodes, maxNumDataNodes, clusterName1, settingsSource, numClientNodes, enableRandomBenchNodes, enableHttpPipelining, jvmOrdinal, nodePrefix);
|
||||
|
||||
assertClusters(cluster0, cluster1, false);
|
||||
long seed = randomLong();
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
*/
|
||||
package org.elasticsearch.test.transport;
|
||||
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.client.transport.TransportClient;
|
||||
|
|
|
@ -64,7 +64,7 @@ public class TribeTests extends ElasticsearchIntegrationTest {
|
|||
public static void setupSecondCluster() throws Exception {
|
||||
ElasticsearchIntegrationTest.beforeClass();
|
||||
// create another cluster
|
||||
cluster2 = new InternalTestCluster(randomLong(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, CHILD_JVM_ID, SECOND_CLUSTER_NODE_PREFIX);
|
||||
cluster2 = new InternalTestCluster(randomLong(), 2, 2, Strings.randomBase64UUID(getRandom()), 0, false, false, CHILD_JVM_ID, SECOND_CLUSTER_NODE_PREFIX);
|
||||
cluster2.beforeTest(getRandom(), 0.1);
|
||||
cluster2.ensureAtLeastNumDataNodes(2);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue