Make http pipelining support mandatory (#30695)

This is related to #29500 and #28898. This commit removes the abilitiy
to disable http pipelining. After this commit, any elasticsearch node
will support pipelined requests from a client. Additionally, it extracts
some of the http pipelining work to the server module. This extracted
work is used to implement pipelining for the nio plugin.
This commit is contained in:
Tim Brooks 2018-05-22 09:29:31 -06:00 committed by GitHub
parent 37f67d9e21
commit 31251c9a6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 989 additions and 646 deletions

View File

@ -29,6 +29,14 @@
[[remove-http-enabled]] [[remove-http-enabled]]
==== Http enabled setting removed ==== Http enabled setting removed
The setting `http.enabled` previously allowed disabling binding to HTTP, only allowing * The setting `http.enabled` previously allowed disabling binding to HTTP, only allowing
use of the transport client. This setting has been removed, as the transport client use of the transport client. This setting has been removed, as the transport client
will be removed in the future, thus requiring HTTP to always be enabled. will be removed in the future, thus requiring HTTP to always be enabled.
[[remove-http-pipelining-setting]]
==== Http pipelining setting removed
* The setting `http.pipelining` previously allowed disabling HTTP pipelining support.
This setting has been removed, as disabling http pipelining support on the server
provided little value. The setting `http.pipelining.max_events` can still be used to
limit the number of pipelined requests in-flight.

View File

@ -96,8 +96,6 @@ and stack traces in response output. Note: When set to `false` and the `error_tr
parameter is specified, an error will be returned; when `error_trace` is not specified, a parameter is specified, an error will be returned; when `error_trace` is not specified, a
simple message will be returned. Defaults to `true` simple message will be returned. Defaults to `true`
|`http.pipelining` |Enable or disable HTTP pipelining, defaults to `true`.
|`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`. |`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`.
|`http.max_warning_header_count` |The maximum number of warning headers in |`http.max_warning_header_count` |The maximum number of warning headers in

View File

@ -42,7 +42,6 @@ import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
@ -59,29 +58,24 @@ final class Netty4HttpChannel extends AbstractRestChannel {
private final Netty4HttpServerTransport transport; private final Netty4HttpServerTransport transport;
private final Channel channel; private final Channel channel;
private final FullHttpRequest nettyRequest; private final FullHttpRequest nettyRequest;
private final HttpPipelinedRequest pipelinedRequest; private final int sequence;
private final ThreadContext threadContext; private final ThreadContext threadContext;
private final HttpHandlingSettings handlingSettings; private final HttpHandlingSettings handlingSettings;
/** /**
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to. * @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
* @param request The request that is handled by this channel. * @param request The request that is handled by this channel.
* @param pipelinedRequest If HTTP pipelining is enabled provide the corresponding pipelined request. May be null if * @param sequence The pipelining sequence number for this request
* HTTP pipelining is disabled. * @param handlingSettings true if error messages should include stack traces.
* @param handlingSettings true iff error messages should include stack traces. * @param threadContext the thread context for the channel
* @param threadContext the thread context for the channel
*/ */
Netty4HttpChannel( Netty4HttpChannel(Netty4HttpServerTransport transport, Netty4HttpRequest request, int sequence, HttpHandlingSettings handlingSettings,
final Netty4HttpServerTransport transport, ThreadContext threadContext) {
final Netty4HttpRequest request,
final HttpPipelinedRequest pipelinedRequest,
final HttpHandlingSettings handlingSettings,
final ThreadContext threadContext) {
super(request, handlingSettings.getDetailedErrorsEnabled()); super(request, handlingSettings.getDetailedErrorsEnabled());
this.transport = transport; this.transport = transport;
this.channel = request.getChannel(); this.channel = request.getChannel();
this.nettyRequest = request.request(); this.nettyRequest = request.request();
this.pipelinedRequest = pipelinedRequest; this.sequence = sequence;
this.threadContext = threadContext; this.threadContext = threadContext;
this.handlingSettings = handlingSettings; this.handlingSettings = handlingSettings;
} }
@ -129,7 +123,7 @@ final class Netty4HttpChannel extends AbstractRestChannel {
final ChannelPromise promise = channel.newPromise(); final ChannelPromise promise = channel.newPromise();
if (releaseContent) { if (releaseContent) {
promise.addListener(f -> ((Releasable)content).close()); promise.addListener(f -> ((Releasable) content).close());
} }
if (releaseBytesStreamOutput) { if (releaseBytesStreamOutput) {
@ -140,13 +134,9 @@ final class Netty4HttpChannel extends AbstractRestChannel {
promise.addListener(ChannelFutureListener.CLOSE); promise.addListener(ChannelFutureListener.CLOSE);
} }
final Object msg; Netty4HttpResponse newResponse = new Netty4HttpResponse(sequence, resp);
if (pipelinedRequest != null) {
msg = pipelinedRequest.createHttpResponse(resp, promise); channel.writeAndFlush(newResponse, promise);
} else {
msg = resp;
}
channel.writeAndFlush(msg, promise);
releaseContent = false; releaseContent = false;
releaseBytesStreamOutput = false; releaseBytesStreamOutput = false;
} finally { } finally {
@ -156,9 +146,6 @@ final class Netty4HttpChannel extends AbstractRestChannel {
if (releaseBytesStreamOutput) { if (releaseBytesStreamOutput) {
bytesOutputOrNull().close(); bytesOutputOrNull().close();
} }
if (pipelinedRequest != null) {
pipelinedRequest.release();
}
} }
} }

View File

@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty4;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.LastHttpContent;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.HttpPipeliningAggregator;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.List;
/**
* Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests.
*/
public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
private final Logger logger;
private final HttpPipeliningAggregator<Netty4HttpResponse, ChannelPromise> aggregator;
/**
* Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
*
* @param logger for logging unexpected errors
* @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is
* required as events cannot queue up indefinitely
*/
public Netty4HttpPipeliningHandler(Logger logger, final int maxEventsHeld) {
this.logger = logger;
this.aggregator = new HttpPipeliningAggregator<>(maxEventsHeld);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof LastHttpContent) {
HttpPipelinedRequest<LastHttpContent> pipelinedRequest = aggregator.read(((LastHttpContent) msg).retain());
ctx.fireChannelRead(pipelinedRequest);
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
assert msg instanceof Netty4HttpResponse : "Message must be type: " + Netty4HttpResponse.class;
Netty4HttpResponse response = (Netty4HttpResponse) msg;
boolean success = false;
try {
List<Tuple<Netty4HttpResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
for (Tuple<Netty4HttpResponse, ChannelPromise> readyResponse : readyResponses) {
ctx.write(readyResponse.v1().getResponse(), readyResponse.v2());
}
success = true;
} catch (IllegalStateException e) {
ctx.channel().close();
} finally {
if (success == false) {
promise.setFailure(new ClosedChannelException());
}
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
List<Tuple<Netty4HttpResponse, ChannelPromise>> inflightResponses = aggregator.removeAllInflightResponses();
if (inflightResponses.isEmpty() == false) {
ClosedChannelException closedChannelException = new ClosedChannelException();
for (Tuple<Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
try {
inflightResponse.v2().setFailure(closedChannelException);
} catch (RuntimeException e) {
logger.error("unexpected error while releasing pipelined http responses", e);
}
}
}
ctx.close(promise);
}
}

View File

@ -30,41 +30,30 @@ import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest; import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.transport.netty4.Netty4Utils; import org.elasticsearch.transport.netty4.Netty4Utils;
import java.util.Collections; import java.util.Collections;
@ChannelHandler.Sharable @ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> { class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
private final Netty4HttpServerTransport serverTransport; private final Netty4HttpServerTransport serverTransport;
private final HttpHandlingSettings handlingSettings; private final HttpHandlingSettings handlingSettings;
private final boolean httpPipeliningEnabled;
private final ThreadContext threadContext; private final ThreadContext threadContext;
Netty4HttpRequestHandler(Netty4HttpServerTransport serverTransport, HttpHandlingSettings handlingSettings, Netty4HttpRequestHandler(Netty4HttpServerTransport serverTransport, HttpHandlingSettings handlingSettings,
ThreadContext threadContext) { ThreadContext threadContext) {
this.serverTransport = serverTransport; this.serverTransport = serverTransport;
this.httpPipeliningEnabled = serverTransport.pipelining;
this.handlingSettings = handlingSettings; this.handlingSettings = handlingSettings;
this.threadContext = threadContext; this.threadContext = threadContext;
} }
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
final FullHttpRequest request; final FullHttpRequest request = msg.getRequest();
final HttpPipelinedRequest pipelinedRequest;
if (this.httpPipeliningEnabled && msg instanceof HttpPipelinedRequest) {
pipelinedRequest = (HttpPipelinedRequest) msg;
request = (FullHttpRequest) pipelinedRequest.last();
} else {
pipelinedRequest = null;
request = (FullHttpRequest) msg;
}
boolean success = false;
try { try {
final FullHttpRequest copy = final FullHttpRequest copy =
@ -111,7 +100,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> {
Netty4HttpChannel innerChannel; Netty4HttpChannel innerChannel;
try { try {
innerChannel = innerChannel =
new Netty4HttpChannel(serverTransport, httpRequest, pipelinedRequest, handlingSettings, threadContext); new Netty4HttpChannel(serverTransport, httpRequest, msg.getSequence(), handlingSettings, threadContext);
} catch (final IllegalArgumentException e) { } catch (final IllegalArgumentException e) {
if (badRequestCause == null) { if (badRequestCause == null) {
badRequestCause = e; badRequestCause = e;
@ -126,7 +115,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> {
copy, copy,
ctx.channel()); ctx.channel());
innerChannel = innerChannel =
new Netty4HttpChannel(serverTransport, innerRequest, pipelinedRequest, handlingSettings, threadContext); new Netty4HttpChannel(serverTransport, innerRequest, msg.getSequence(), handlingSettings, threadContext);
} }
channel = innerChannel; channel = innerChannel;
} }
@ -138,12 +127,9 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<Object> {
} else { } else {
serverTransport.dispatchRequest(httpRequest, channel); serverTransport.dispatchRequest(httpRequest, channel);
} }
success = true;
} finally { } finally {
// the request is otherwise released in case of dispatch // As we have copied the buffer, we can release the request
if (success == false && pipelinedRequest != null) { request.release();
pipelinedRequest.release();
}
} }
} }

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty4;
import io.netty.handler.codec.http.FullHttpResponse;
import org.elasticsearch.http.HttpPipelinedMessage;
public class Netty4HttpResponse extends HttpPipelinedMessage {
private final FullHttpResponse response;
public Netty4HttpResponse(int sequence, FullHttpResponse response) {
super(sequence);
this.response = response;
}
public FullHttpResponse getResponse() {
return response;
}
}

View File

@ -62,7 +62,6 @@ import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig; import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder; import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipeliningHandler;
import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler; import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler;
@ -99,7 +98,6 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_D
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
import static org.elasticsearch.http.netty4.cors.Netty4CorsHandler.ANY_ORIGIN; import static org.elasticsearch.http.netty4.cors.Netty4CorsHandler.ANY_ORIGIN;
@ -162,8 +160,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
protected final int workerCount; protected final int workerCount;
protected final boolean pipelining;
protected final int pipeliningMaxEvents; protected final int pipeliningMaxEvents;
/** /**
@ -204,6 +200,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings); this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
this.httpHandlingSettings = new HttpHandlingSettings(Math.toIntExact(maxContentLength.getBytes()), this.httpHandlingSettings = new HttpHandlingSettings(Math.toIntExact(maxContentLength.getBytes()),
Math.toIntExact(maxChunkSize.getBytes()), Math.toIntExact(maxChunkSize.getBytes()),
Math.toIntExact(maxHeaderSize.getBytes()), Math.toIntExact(maxHeaderSize.getBytes()),
@ -211,7 +208,8 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
SETTING_HTTP_RESET_COOKIES.get(settings), SETTING_HTTP_RESET_COOKIES.get(settings),
SETTING_HTTP_COMPRESSION.get(settings), SETTING_HTTP_COMPRESSION.get(settings),
SETTING_HTTP_COMPRESSION_LEVEL.get(settings), SETTING_HTTP_COMPRESSION_LEVEL.get(settings),
SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings)); SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings),
pipeliningMaxEvents);
this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings); this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings); this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
@ -226,14 +224,12 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings); ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt()); recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt());
this.pipelining = SETTING_PIPELINING.get(settings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
this.corsConfig = buildCorsConfig(settings); this.corsConfig = buildCorsConfig(settings);
logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " + logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " +
"receive_predictor[{}], max_composite_buffer_components[{}], pipelining[{}], pipelining_max_events[{}]", "receive_predictor[{}], max_composite_buffer_components[{}], pipelining_max_events[{}]",
maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictor, maxCompositeBufferComponents, maxChunkSize, maxHeaderSize, maxInitialLineLength, maxContentLength, receivePredictor, maxCompositeBufferComponents,
pipelining, pipeliningMaxEvents); pipeliningMaxEvents);
} }
public Settings settings() { public Settings settings() {
@ -452,9 +448,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
if (SETTING_CORS_ENABLED.get(transport.settings())) { if (SETTING_CORS_ENABLED.get(transport.settings())) {
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig())); ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig()));
} }
if (transport.pipelining) { ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("pipelining", new HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
}
ch.pipeline().addLast("handler", requestHandler); ch.pipeline().addLast("handler", requestHandler);
} }

View File

@ -1,88 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty4.pipelining;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCounted;
/**
* Permits downstream channel events to be ordered and signalled as to whether more are to come for
* a given sequence.
*/
public class HttpPipelinedRequest implements ReferenceCounted {
private final LastHttpContent last;
private final int sequence;
public HttpPipelinedRequest(final LastHttpContent last, final int sequence) {
this.last = last;
this.sequence = sequence;
}
public LastHttpContent last() {
return last;
}
public HttpPipelinedResponse createHttpResponse(final FullHttpResponse response, final ChannelPromise promise) {
return new HttpPipelinedResponse(response, promise, sequence);
}
@Override
public int refCnt() {
return last.refCnt();
}
@Override
public ReferenceCounted retain() {
last.retain();
return this;
}
@Override
public ReferenceCounted retain(int increment) {
last.retain(increment);
return this;
}
@Override
public ReferenceCounted touch() {
last.touch();
return this;
}
@Override
public ReferenceCounted touch(Object hint) {
last.touch(hint);
return this;
}
@Override
public boolean release() {
return last.release();
}
@Override
public boolean release(int decrement) {
return last.release(decrement);
}
}

View File

@ -1,94 +0,0 @@
package org.elasticsearch.http.netty4.pipelining;
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.ReferenceCounted;
class HttpPipelinedResponse implements Comparable<HttpPipelinedResponse>, ReferenceCounted {
private final FullHttpResponse response;
private final ChannelPromise promise;
private final int sequence;
HttpPipelinedResponse(FullHttpResponse response, ChannelPromise promise, int sequence) {
this.response = response;
this.promise = promise;
this.sequence = sequence;
}
public FullHttpResponse response() {
return response;
}
public ChannelPromise promise() {
return promise;
}
public int sequence() {
return sequence;
}
@Override
public int compareTo(HttpPipelinedResponse o) {
return Integer.compare(sequence, o.sequence);
}
@Override
public int refCnt() {
return response.refCnt();
}
@Override
public ReferenceCounted retain() {
response.retain();
return this;
}
@Override
public ReferenceCounted retain(int increment) {
response.retain(increment);
return this;
}
@Override
public ReferenceCounted touch() {
response.touch();
return this;
}
@Override
public ReferenceCounted touch(Object hint) {
response.touch(hint);
return this;
}
@Override
public boolean release() {
return response.release();
}
@Override
public boolean release(int decrement) {
return response.release(decrement);
}
}

View File

@ -1,144 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty4.pipelining;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.LastHttpContent;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.PriorityQueue;
/**
* Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests.
*/
public class HttpPipeliningHandler extends ChannelDuplexHandler {
// we use a priority queue so that responses are ordered by their sequence number
private final PriorityQueue<HttpPipelinedResponse> holdingQueue;
private final Logger logger;
private final int maxEventsHeld;
/*
* The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
* channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the
* current write sequence, implying that all preceding messages have been written.
*/
private int readSequence;
private int writeSequence;
/**
* Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
*
* @param logger for logging unexpected errors
* @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is
* required as events cannot queue up indefinitely
*/
public HttpPipeliningHandler(Logger logger, final int maxEventsHeld) {
this.logger = logger;
this.maxEventsHeld = maxEventsHeld;
this.holdingQueue = new PriorityQueue<>(1);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (msg instanceof LastHttpContent) {
ctx.fireChannelRead(new HttpPipelinedRequest(((LastHttpContent) msg).retain(), readSequence++));
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof HttpPipelinedResponse) {
final HttpPipelinedResponse current = (HttpPipelinedResponse) msg;
/*
* We attach the promise to the response. When we invoke a write on the channel with the response, we must ensure that we invoke
* the write methods that accept the same promise that we have attached to the response otherwise as the response proceeds
* through the handler pipeline a different promise will be used until reaching this handler. Therefore, we assert here that the
* attached promise is identical to the provided promise as a safety mechanism that we are respecting this.
*/
assert current.promise() == promise;
boolean channelShouldClose = false;
synchronized (holdingQueue) {
if (holdingQueue.size() < maxEventsHeld) {
holdingQueue.add(current);
while (!holdingQueue.isEmpty()) {
/*
* Since the response with the lowest sequence number is the top of the priority queue, we know if its sequence
* number does not match the current write sequence number then we have not processed all preceding responses yet.
*/
final HttpPipelinedResponse top = holdingQueue.peek();
if (top.sequence() != writeSequence) {
break;
}
holdingQueue.remove();
/*
* We must use the promise attached to the response; this is necessary since are going to hold a response until all
* responses that precede it in the pipeline are written first. Note that the promise from the method invocation is
* not ignored, it will already be attached to an existing response and consumed when that response is drained.
*/
ctx.write(top.response(), top.promise());
writeSequence++;
}
} else {
channelShouldClose = true;
}
}
if (channelShouldClose) {
try {
Netty4Utils.closeChannels(Collections.singletonList(ctx.channel()));
} finally {
current.release();
promise.setSuccess();
}
}
} else {
ctx.write(msg, promise);
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (holdingQueue.isEmpty() == false) {
ClosedChannelException closedChannelException = new ClosedChannelException();
HttpPipelinedResponse pipelinedResponse;
while ((pipelinedResponse = holdingQueue.poll()) != null) {
try {
pipelinedResponse.release();
pipelinedResponse.promise().setFailure(closedChannelException);
} catch (Exception e) {
logger.error("unexpected error while releasing pipelined http responses", e);
}
}
}
ctx.close(promise);
}
}

View File

@ -60,7 +60,6 @@ import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.NullDispatcher;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestResponse;
@ -212,12 +211,12 @@ public class Netty4HttpChannelTests extends ESTestCase {
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
httpRequest.headers().add(HttpHeaderNames.ORIGIN, "remote"); httpRequest.headers().add(HttpHeaderNames.ORIGIN, "remote");
final WriteCapturingChannel writeCapturingChannel = new WriteCapturingChannel(); final WriteCapturingChannel writeCapturingChannel = new WriteCapturingChannel();
Netty4HttpRequest request = new Netty4HttpRequest(xContentRegistry(), httpRequest, writeCapturingChannel); final Netty4HttpRequest request = new Netty4HttpRequest(xContentRegistry(), httpRequest, writeCapturingChannel);
HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings; HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings;
// send a response // send a response
Netty4HttpChannel channel = Netty4HttpChannel channel =
new Netty4HttpChannel(httpServerTransport, request, null, handlingSettings, threadPool.getThreadContext()); new Netty4HttpChannel(httpServerTransport, request, 1, handlingSettings, threadPool.getThreadContext());
TestResponse resp = new TestResponse(); TestResponse resp = new TestResponse();
final String customHeader = "custom-header"; final String customHeader = "custom-header";
final String customHeaderValue = "xyz"; final String customHeaderValue = "xyz";
@ -227,7 +226,7 @@ public class Netty4HttpChannelTests extends ESTestCase {
// inspect what was written // inspect what was written
List<Object> writtenObjects = writeCapturingChannel.getWrittenObjects(); List<Object> writtenObjects = writeCapturingChannel.getWrittenObjects();
assertThat(writtenObjects.size(), is(1)); assertThat(writtenObjects.size(), is(1));
HttpResponse response = (HttpResponse) writtenObjects.get(0); HttpResponse response = ((Netty4HttpResponse) writtenObjects.get(0)).getResponse();
assertThat(response.headers().get("non-existent-header"), nullValue()); assertThat(response.headers().get("non-existent-header"), nullValue());
assertThat(response.headers().get(customHeader), equalTo(customHeaderValue)); assertThat(response.headers().get(customHeader), equalTo(customHeaderValue));
assertThat(response.headers().get(HttpHeaderNames.CONTENT_LENGTH), equalTo(Integer.toString(resp.content().length()))); assertThat(response.headers().get(HttpHeaderNames.CONTENT_LENGTH), equalTo(Integer.toString(resp.content().length())));
@ -243,10 +242,9 @@ public class Netty4HttpChannelTests extends ESTestCase {
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(); final EmbeddedChannel embeddedChannel = new EmbeddedChannel();
final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel); final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel);
final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null;
HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings; HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings;
final Netty4HttpChannel channel = final Netty4HttpChannel channel =
new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, handlingSettings, threadPool.getThreadContext()); new Netty4HttpChannel(httpServerTransport, request, 1, handlingSettings, threadPool.getThreadContext());
final TestResponse response = new TestResponse(bigArrays); final TestResponse response = new TestResponse(bigArrays);
assertThat(response.content(), instanceOf(Releasable.class)); assertThat(response.content(), instanceOf(Releasable.class));
embeddedChannel.close(); embeddedChannel.close();
@ -263,10 +261,9 @@ public class Netty4HttpChannelTests extends ESTestCase {
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(); final EmbeddedChannel embeddedChannel = new EmbeddedChannel();
final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel); final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel);
final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null;
HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings; HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings;
final Netty4HttpChannel channel = final Netty4HttpChannel channel =
new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, handlingSettings, threadPool.getThreadContext()); new Netty4HttpChannel(httpServerTransport, request, 1, handlingSettings, threadPool.getThreadContext());
final BytesRestResponse response = new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, final BytesRestResponse response = new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
JsonXContent.contentBuilder().startObject().endObject()); JsonXContent.contentBuilder().startObject().endObject());
assertThat(response.content(), not(instanceOf(Releasable.class))); assertThat(response.content(), not(instanceOf(Releasable.class)));
@ -312,7 +309,7 @@ public class Netty4HttpChannelTests extends ESTestCase {
assertTrue(embeddedChannel.isOpen()); assertTrue(embeddedChannel.isOpen());
HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings; HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings;
final Netty4HttpChannel channel = final Netty4HttpChannel channel =
new Netty4HttpChannel(httpServerTransport, request, null, handlingSettings, threadPool.getThreadContext()); new Netty4HttpChannel(httpServerTransport, request, 1, handlingSettings, threadPool.getThreadContext());
final TestResponse resp = new TestResponse(); final TestResponse resp = new TestResponse();
channel.sendResponse(resp); channel.sendResponse(resp);
assertThat(embeddedChannel.isOpen(), equalTo(!close)); assertThat(embeddedChannel.isOpen(), equalTo(!close));
@ -340,13 +337,13 @@ public class Netty4HttpChannelTests extends ESTestCase {
HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings; HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings;
Netty4HttpChannel channel = Netty4HttpChannel channel =
new Netty4HttpChannel(httpServerTransport, request, null, handlingSettings, threadPool.getThreadContext()); new Netty4HttpChannel(httpServerTransport, request, 1, handlingSettings, threadPool.getThreadContext());
channel.sendResponse(new TestResponse()); channel.sendResponse(new TestResponse());
// get the response // get the response
List<Object> writtenObjects = writeCapturingChannel.getWrittenObjects(); List<Object> writtenObjects = writeCapturingChannel.getWrittenObjects();
assertThat(writtenObjects.size(), is(1)); assertThat(writtenObjects.size(), is(1));
return (FullHttpResponse) writtenObjects.get(0); return ((Netty4HttpResponse) writtenObjects.get(0)).getResponse();
} }
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.http.netty4.pipelining; package org.elasticsearch.http.netty4;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
@ -37,6 +37,7 @@ import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.codec.http.QueryStringDecoder;
import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Randomness;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.junit.After; import org.junit.After;
@ -62,7 +63,8 @@ import static org.hamcrest.core.Is.is;
public class Netty4HttpPipeliningHandlerTests extends ESTestCase { public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
private final ExecutorService executorService = Executors.newFixedThreadPool(randomIntBetween(4, 8)); private final ExecutorService handlerService = Executors.newFixedThreadPool(randomIntBetween(4, 8));
private final ExecutorService eventLoopService = Executors.newFixedThreadPool(1);
private final Map<String, CountDownLatch> waitingRequests = new ConcurrentHashMap<>(); private final Map<String, CountDownLatch> waitingRequests = new ConcurrentHashMap<>();
private final Map<String, CountDownLatch> finishingRequests = new ConcurrentHashMap<>(); private final Map<String, CountDownLatch> finishingRequests = new ConcurrentHashMap<>();
@ -79,15 +81,19 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
} }
private void shutdownExecutorService() throws InterruptedException { private void shutdownExecutorService() throws InterruptedException {
if (!executorService.isShutdown()) { if (!handlerService.isShutdown()) {
executorService.shutdown(); handlerService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS); handlerService.awaitTermination(10, TimeUnit.SECONDS);
}
if (!eventLoopService.isShutdown()) {
eventLoopService.shutdown();
eventLoopService.awaitTermination(10, TimeUnit.SECONDS);
} }
} }
public void testThatPipeliningWorksWithFastSerializedRequests() throws InterruptedException { public void testThatPipeliningWorksWithFastSerializedRequests() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128); final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests), final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler()); new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) { for (int i = 0; i < numberOfRequests; i++) {
@ -114,7 +120,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws InterruptedException { public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128); final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests), final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler()); new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) { for (int i = 0; i < numberOfRequests; i++) {
@ -147,7 +153,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
final EmbeddedChannel embeddedChannel = final EmbeddedChannel embeddedChannel =
new EmbeddedChannel( new EmbeddedChannel(
new AggregateUrisAndHeadersHandler(), new AggregateUrisAndHeadersHandler(),
new HttpPipeliningHandler(logger, numberOfRequests), new Netty4HttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler()); new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) { for (int i = 0; i < numberOfRequests; i++) {
@ -176,7 +182,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException { public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128); final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests), final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler()); new WorkEmulatorHandler());
for (int i = 0; i < 1 + numberOfRequests + 1; i++) { for (int i = 0; i < 1 + numberOfRequests + 1; i++) {
@ -184,7 +190,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
} }
final List<CountDownLatch> latches = new ArrayList<>(); final List<CountDownLatch> latches = new ArrayList<>();
final List<Integer> requests = IntStream.range(1, numberOfRequests + 1).mapToObj(r -> r).collect(Collectors.toList()); final List<Integer> requests = IntStream.range(1, numberOfRequests + 1).boxed().collect(Collectors.toList());
Randomness.shuffle(requests); Randomness.shuffle(requests);
for (final Integer request : requests) { for (final Integer request : requests) {
@ -205,25 +211,26 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
public void testPipeliningRequestsAreReleased() throws InterruptedException { public void testPipeliningRequestsAreReleased() throws InterruptedException {
final int numberOfRequests = 10; final int numberOfRequests = 10;
final EmbeddedChannel embeddedChannel = final EmbeddedChannel embeddedChannel =
new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests + 1)); new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests + 1));
for (int i = 0; i < numberOfRequests; i++) { for (int i = 0; i < numberOfRequests; i++) {
embeddedChannel.writeInbound(createHttpRequest("/" + i)); embeddedChannel.writeInbound(createHttpRequest("/" + i));
} }
HttpPipelinedRequest inbound; HttpPipelinedRequest<FullHttpRequest> inbound;
ArrayList<HttpPipelinedRequest> requests = new ArrayList<>(); ArrayList<HttpPipelinedRequest<FullHttpRequest>> requests = new ArrayList<>();
while ((inbound = embeddedChannel.readInbound()) != null) { while ((inbound = embeddedChannel.readInbound()) != null) {
requests.add(inbound); requests.add(inbound);
} }
ArrayList<ChannelPromise> promises = new ArrayList<>(); ArrayList<ChannelPromise> promises = new ArrayList<>();
for (int i = 1; i < requests.size(); ++i) { for (int i = 1; i < requests.size(); ++i) {
final DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK); final FullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK);
ChannelPromise promise = embeddedChannel.newPromise(); ChannelPromise promise = embeddedChannel.newPromise();
promises.add(promise); promises.add(promise);
HttpPipelinedResponse response = requests.get(i).createHttpResponse(httpResponse, promise); int sequence = requests.get(i).getSequence();
embeddedChannel.writeAndFlush(response, promise); Netty4HttpResponse resp = new Netty4HttpResponse(sequence, httpResponse);
embeddedChannel.writeAndFlush(resp, promise);
} }
for (ChannelPromise promise : promises) { for (ChannelPromise promise : promises) {
@ -260,14 +267,14 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
} }
private class WorkEmulatorHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> { private class WorkEmulatorHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<LastHttpContent>> {
@Override @Override
protected void channelRead0(final ChannelHandlerContext ctx, final HttpPipelinedRequest pipelinedRequest) throws Exception { protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest<LastHttpContent> pipelinedRequest) {
LastHttpContent request = pipelinedRequest.getRequest();
final QueryStringDecoder decoder; final QueryStringDecoder decoder;
if (pipelinedRequest.last() instanceof FullHttpRequest) { if (request instanceof FullHttpRequest) {
final FullHttpRequest fullHttpRequest = (FullHttpRequest) pipelinedRequest.last(); decoder = new QueryStringDecoder(((FullHttpRequest)request).uri());
decoder = new QueryStringDecoder(fullHttpRequest.uri());
} else { } else {
decoder = new QueryStringDecoder(AggregateUrisAndHeadersHandler.QUEUE_URI.poll()); decoder = new QueryStringDecoder(AggregateUrisAndHeadersHandler.QUEUE_URI.poll());
} }
@ -282,12 +289,14 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
final CountDownLatch finishingLatch = new CountDownLatch(1); final CountDownLatch finishingLatch = new CountDownLatch(1);
finishingRequests.put(uri, finishingLatch); finishingRequests.put(uri, finishingLatch);
executorService.submit(() -> { handlerService.submit(() -> {
try { try {
waitingLatch.await(1000, TimeUnit.SECONDS); waitingLatch.await(1000, TimeUnit.SECONDS);
final ChannelPromise promise = ctx.newPromise(); final ChannelPromise promise = ctx.newPromise();
ctx.write(pipelinedRequest.createHttpResponse(httpResponse, promise), promise); eventLoopService.submit(() -> {
finishingLatch.countDown(); ctx.write(new Netty4HttpResponse(pipelinedRequest.getSequence(), httpResponse), promise);
finishingLatch.countDown();
});
} catch (InterruptedException e) { } catch (InterruptedException e) {
fail(e.toString()); fail(e.toString());
} }

View File

@ -38,9 +38,9 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.NullDispatcher;
import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
@ -52,16 +52,11 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import static org.elasticsearch.test.hamcrest.RegexMatcher.matches;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
/** /**
* This test just tests, if he pipelining works in general with out any connection the Elasticsearch handler * This test just tests, if he pipelining works in general with out any connection the Elasticsearch handler
@ -85,9 +80,8 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
} }
} }
public void testThatHttpPipeliningWorksWhenEnabled() throws Exception { public void testThatHttpPipeliningWorks() throws Exception {
final Settings settings = Settings.builder() final Settings settings = Settings.builder()
.put("http.pipelining", true)
.put("http.port", "0") .put("http.port", "0")
.build(); .build();
try (HttpServerTransport httpServerTransport = new CustomNettyHttpServerTransport(settings)) { try (HttpServerTransport httpServerTransport = new CustomNettyHttpServerTransport(settings)) {
@ -112,48 +106,6 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
} }
} }
public void testThatHttpPipeliningCanBeDisabled() throws Exception {
final Settings settings = Settings.builder()
.put("http.pipelining", false)
.put("http.port", "0")
.build();
try (HttpServerTransport httpServerTransport = new CustomNettyHttpServerTransport(settings)) {
httpServerTransport.start();
final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses());
final int numberOfRequests = randomIntBetween(4, 16);
final Set<Integer> slowIds = new HashSet<>();
final List<String> requests = new ArrayList<>(numberOfRequests);
for (int i = 0; i < numberOfRequests; i++) {
if (rarely()) {
requests.add("/slow/" + i);
slowIds.add(i);
} else {
requests.add("/" + i);
}
}
try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{}));
List<String> responseBodies = new ArrayList<>(Netty4HttpClient.returnHttpResponseBodies(responses));
// we can not be sure about the order of the responses, but the slow ones should come last
assertThat(responseBodies, hasSize(numberOfRequests));
for (int i = 0; i < numberOfRequests - slowIds.size(); i++) {
assertThat(responseBodies.get(i), matches("/\\d+"));
}
final Set<Integer> ids = new HashSet<>();
for (int i = 0; i < slowIds.size(); i++) {
final String response = responseBodies.get(numberOfRequests - slowIds.size() + i);
assertThat(response, matches("/slow/\\d+" ));
assertTrue(ids.add(Integer.parseInt(response.split("/")[2])));
}
assertThat(slowIds, equalTo(ids));
}
}
}
class CustomNettyHttpServerTransport extends Netty4HttpServerTransport { class CustomNettyHttpServerTransport extends Netty4HttpServerTransport {
private final ExecutorService executorService = Executors.newCachedThreadPool(); private final ExecutorService executorService = Executors.newCachedThreadPool();
@ -196,7 +148,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
} }
class PossiblySlowUpstreamHandler extends SimpleChannelInboundHandler<Object> { class PossiblySlowUpstreamHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
private final ExecutorService executorService; private final ExecutorService executorService;
@ -205,7 +157,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
} }
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
executorService.submit(new PossiblySlowRunnable(ctx, msg)); executorService.submit(new PossiblySlowRunnable(ctx, msg));
} }
@ -220,26 +172,18 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
class PossiblySlowRunnable implements Runnable { class PossiblySlowRunnable implements Runnable {
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private HttpPipelinedRequest pipelinedRequest; private HttpPipelinedRequest<FullHttpRequest> pipelinedRequest;
private FullHttpRequest fullHttpRequest; private FullHttpRequest fullHttpRequest;
PossiblySlowRunnable(ChannelHandlerContext ctx, Object msg) { PossiblySlowRunnable(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
this.ctx = ctx; this.ctx = ctx;
if (msg instanceof HttpPipelinedRequest) { this.pipelinedRequest = msg;
this.pipelinedRequest = (HttpPipelinedRequest) msg; this.fullHttpRequest = pipelinedRequest.getRequest();
} else if (msg instanceof FullHttpRequest) {
this.fullHttpRequest = (FullHttpRequest) msg;
}
} }
@Override @Override
public void run() { public void run() {
final String uri; final String uri = fullHttpRequest.uri();
if (pipelinedRequest != null && pipelinedRequest.last() instanceof FullHttpRequest) {
uri = ((FullHttpRequest) pipelinedRequest.last()).uri();
} else {
uri = fullHttpRequest.uri();
}
final ByteBuf buffer = Unpooled.copiedBuffer(uri, StandardCharsets.UTF_8); final ByteBuf buffer = Unpooled.copiedBuffer(uri, StandardCharsets.UTF_8);
@ -258,13 +202,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
} }
final ChannelPromise promise = ctx.newPromise(); final ChannelPromise promise = ctx.newPromise();
final Object msg; ctx.writeAndFlush(new Netty4HttpResponse(pipelinedRequest.getSequence(), httpResponse), promise);
if (pipelinedRequest != null) {
msg = pipelinedRequest.createHttpResponse(httpResponse, promise);
} else {
msg = httpResponse;
}
ctx.writeAndFlush(msg, promise);
} }
} }

View File

@ -21,8 +21,6 @@ package org.elasticsearch.http.netty4;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
import org.elasticsearch.ESNetty4IntegTestCase; import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@ -35,21 +33,13 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1) @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class Netty4PipeliningEnabledIT extends ESNetty4IntegTestCase { public class Netty4PipeliningIT extends ESNetty4IntegTestCase {
@Override @Override
protected boolean addMockHttpTransport() { protected boolean addMockHttpTransport() {
return false; // enable http return false; // enable http
} }
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("http.pipelining", true)
.build();
}
public void testThatNettyHttpServerSupportsPipelining() throws Exception { public void testThatNettyHttpServerSupportsPipelining() throws Exception {
String[] requests = new String[]{"/", "/_nodes/stats", "/", "/_cluster/state", "/"}; String[] requests = new String[]{"/", "/_nodes/stats", "/", "/_cluster/state", "/"};

View File

@ -25,20 +25,21 @@ import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseEncoder;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.FlushOperation;
import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.ReadWriteHandler;
import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ReadWriteHandler;
import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.WriteOperation; import org.elasticsearch.nio.WriteOperation;
import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestRequest;
@ -77,6 +78,7 @@ public class HttpReadWriteHandler implements ReadWriteHandler {
if (settings.isCompression()) { if (settings.isCompression()) {
handlers.add(new HttpContentCompressor(settings.getCompressionLevel())); handlers.add(new HttpContentCompressor(settings.getCompressionLevel()));
} }
handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents()));
adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0])); adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0]));
adaptor.addCloseListener((v, e) -> nioChannel.close()); adaptor.addCloseListener((v, e) -> nioChannel.close());
@ -95,9 +97,9 @@ public class HttpReadWriteHandler implements ReadWriteHandler {
@Override @Override
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener) { public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Throwable> listener) {
assert message instanceof FullHttpResponse : "This channel only supports messages that are of type: " + FullHttpResponse.class assert message instanceof NioHttpResponse : "This channel only supports messages that are of type: "
+ ". Found type: " + message.getClass() + "."; + NioHttpResponse.class + ". Found type: " + message.getClass() + ".";
return new HttpWriteOperation(context, (FullHttpResponse) message, listener); return new HttpWriteOperation(context, (NioHttpResponse) message, listener);
} }
@Override @Override
@ -125,76 +127,85 @@ public class HttpReadWriteHandler implements ReadWriteHandler {
} }
} }
@SuppressWarnings("unchecked")
private void handleRequest(Object msg) { private void handleRequest(Object msg) {
final FullHttpRequest request = (FullHttpRequest) msg; final HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = (HttpPipelinedRequest<FullHttpRequest>) msg;
FullHttpRequest request = pipelinedRequest.getRequest();
final FullHttpRequest copiedRequest = try {
new DefaultFullHttpRequest( final FullHttpRequest copiedRequest =
request.protocolVersion(), new DefaultFullHttpRequest(
request.method(), request.protocolVersion(),
request.uri(), request.method(),
Unpooled.copiedBuffer(request.content()), request.uri(),
request.headers(), Unpooled.copiedBuffer(request.content()),
request.trailingHeaders()); request.headers(),
request.trailingHeaders());
Exception badRequestCause = null; Exception badRequestCause = null;
/* /*
* We want to create a REST request from the incoming request from Netty. However, creating this request could fail if there * We want to create a REST request from the incoming request from Netty. However, creating this request could fail if there
* are incorrectly encoded parameters, or the Content-Type header is invalid. If one of these specific failures occurs, we * are incorrectly encoded parameters, or the Content-Type header is invalid. If one of these specific failures occurs, we
* attempt to create a REST request again without the input that caused the exception (e.g., we remove the Content-Type header, * attempt to create a REST request again without the input that caused the exception (e.g., we remove the Content-Type header,
* or skip decoding the parameters). Once we have a request in hand, we then dispatch the request as a bad request with the * or skip decoding the parameters). Once we have a request in hand, we then dispatch the request as a bad request with the
* underlying exception that caused us to treat the request as bad. * underlying exception that caused us to treat the request as bad.
*/ */
final NioHttpRequest httpRequest; final NioHttpRequest httpRequest;
{ {
NioHttpRequest innerHttpRequest; NioHttpRequest innerHttpRequest;
try { try {
innerHttpRequest = new NioHttpRequest(xContentRegistry, copiedRequest); innerHttpRequest = new NioHttpRequest(xContentRegistry, copiedRequest);
} catch (final RestRequest.ContentTypeHeaderException e) { } catch (final RestRequest.ContentTypeHeaderException e) {
badRequestCause = e;
innerHttpRequest = requestWithoutContentTypeHeader(copiedRequest, badRequestCause);
} catch (final RestRequest.BadParameterException e) {
badRequestCause = e;
innerHttpRequest = requestWithoutParameters(copiedRequest);
}
httpRequest = innerHttpRequest;
}
/*
* We now want to create a channel used to send the response on. However, creating this channel can fail if there are invalid
* parameter values for any of the filter_path, human, or pretty parameters. We detect these specific failures via an
* IllegalArgumentException from the channel constructor and then attempt to create a new channel that bypasses parsing of these
* parameter values.
*/
final NioHttpChannel channel;
{
NioHttpChannel innerChannel;
try {
innerChannel = new NioHttpChannel(nioChannel, transport.getBigArrays(), httpRequest, settings, threadContext);
} catch (final IllegalArgumentException e) {
if (badRequestCause == null) {
badRequestCause = e; badRequestCause = e;
} else { innerHttpRequest = requestWithoutContentTypeHeader(copiedRequest, badRequestCause);
badRequestCause.addSuppressed(e); } catch (final RestRequest.BadParameterException e) {
badRequestCause = e;
innerHttpRequest = requestWithoutParameters(copiedRequest);
} }
final NioHttpRequest innerRequest = httpRequest = innerHttpRequest;
new NioHttpRequest(
xContentRegistry,
Collections.emptyMap(), // we are going to dispatch the request as a bad request, drop all parameters
copiedRequest.uri(),
copiedRequest);
innerChannel = new NioHttpChannel(nioChannel, transport.getBigArrays(), innerRequest, settings, threadContext);
} }
channel = innerChannel;
}
if (request.decoderResult().isFailure()) { /*
transport.dispatchBadRequest(httpRequest, channel, request.decoderResult().cause()); * We now want to create a channel used to send the response on. However, creating this channel can fail if there are invalid
} else if (badRequestCause != null) { * parameter values for any of the filter_path, human, or pretty parameters. We detect these specific failures via an
transport.dispatchBadRequest(httpRequest, channel, badRequestCause); * IllegalArgumentException from the channel constructor and then attempt to create a new channel that bypasses parsing of
} else { * these parameter values.
transport.dispatchRequest(httpRequest, channel); */
final NioHttpChannel channel;
{
NioHttpChannel innerChannel;
int sequence = pipelinedRequest.getSequence();
BigArrays bigArrays = transport.getBigArrays();
try {
innerChannel = new NioHttpChannel(nioChannel, bigArrays, httpRequest, sequence, settings, threadContext);
} catch (final IllegalArgumentException e) {
if (badRequestCause == null) {
badRequestCause = e;
} else {
badRequestCause.addSuppressed(e);
}
final NioHttpRequest innerRequest =
new NioHttpRequest(
xContentRegistry,
Collections.emptyMap(), // we are going to dispatch the request as a bad request, drop all parameters
copiedRequest.uri(),
copiedRequest);
innerChannel = new NioHttpChannel(nioChannel, bigArrays, innerRequest, sequence, settings, threadContext);
}
channel = innerChannel;
}
if (request.decoderResult().isFailure()) {
transport.dispatchBadRequest(httpRequest, channel, request.decoderResult().cause());
} else if (badRequestCause != null) {
transport.dispatchBadRequest(httpRequest, channel, badRequestCause);
} else {
transport.dispatchRequest(httpRequest, channel);
}
} finally {
// As we have copied the buffer, we can release the request
request.release();
} }
} }

View File

@ -19,7 +19,6 @@
package org.elasticsearch.http.nio; package org.elasticsearch.http.nio;
import io.netty.handler.codec.http.FullHttpResponse;
import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.WriteOperation; import org.elasticsearch.nio.WriteOperation;
@ -28,10 +27,10 @@ import java.util.function.BiConsumer;
public class HttpWriteOperation implements WriteOperation { public class HttpWriteOperation implements WriteOperation {
private final SocketChannelContext channelContext; private final SocketChannelContext channelContext;
private final FullHttpResponse response; private final NioHttpResponse response;
private final BiConsumer<Void, Throwable> listener; private final BiConsumer<Void, Throwable> listener;
HttpWriteOperation(SocketChannelContext channelContext, FullHttpResponse response, BiConsumer<Void, Throwable> listener) { HttpWriteOperation(SocketChannelContext channelContext, NioHttpResponse response, BiConsumer<Void, Throwable> listener) {
this.channelContext = channelContext; this.channelContext = channelContext;
this.response = response; this.response = response;
this.listener = listener; this.listener = listener;
@ -48,7 +47,7 @@ public class HttpWriteOperation implements WriteOperation {
} }
@Override @Override
public FullHttpResponse getObject() { public NioHttpResponse getObject() {
return response; return response;
} }
} }

View File

@ -53,12 +53,7 @@ public class NettyAdaptor implements AutoCloseable {
try { try {
ByteBuf message = (ByteBuf) msg; ByteBuf message = (ByteBuf) msg;
promise.addListener((f) -> message.release()); promise.addListener((f) -> message.release());
NettyListener listener; NettyListener listener = NettyListener.fromChannelPromise(promise);
if (promise instanceof NettyListener) {
listener = (NettyListener) promise;
} else {
listener = new NettyListener(promise);
}
flushOperations.add(new FlushOperation(message.nioBuffers(), listener)); flushOperations.add(new FlushOperation(message.nioBuffers(), listener));
} catch (Exception e) { } catch (Exception e) {
promise.setFailure(e); promise.setFailure(e);
@ -107,18 +102,7 @@ public class NettyAdaptor implements AutoCloseable {
} }
public void write(WriteOperation writeOperation) { public void write(WriteOperation writeOperation) {
ChannelPromise channelPromise = nettyChannel.newPromise(); nettyChannel.writeAndFlush(writeOperation.getObject(), NettyListener.fromBiConsumer(writeOperation.getListener(), nettyChannel));
channelPromise.addListener(f -> {
BiConsumer<Void, Throwable> consumer = writeOperation.getListener();
if (f.cause() == null) {
consumer.accept(null, null);
} else {
ExceptionsHelper.dieOnError(f.cause());
consumer.accept(null, f.cause());
}
});
nettyChannel.writeAndFlush(writeOperation.getObject(), new NettyListener(channelPromise));
} }
public FlushOperation pollOutboundOperation() { public FlushOperation pollOutboundOperation() {

View File

@ -23,7 +23,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -40,7 +40,7 @@ public class NettyListener implements BiConsumer<Void, Throwable>, ChannelPromis
private final ChannelPromise promise; private final ChannelPromise promise;
NettyListener(ChannelPromise promise) { private NettyListener(ChannelPromise promise) {
this.promise = promise; this.promise = promise;
} }
@ -211,4 +211,30 @@ public class NettyListener implements BiConsumer<Void, Throwable>, ChannelPromis
public ChannelPromise unvoid() { public ChannelPromise unvoid() {
return promise.unvoid(); return promise.unvoid();
} }
public static NettyListener fromBiConsumer(BiConsumer<Void, Throwable> biConsumer, Channel channel) {
if (biConsumer instanceof NettyListener) {
return (NettyListener) biConsumer;
} else {
ChannelPromise channelPromise = channel.newPromise();
channelPromise.addListener(f -> {
if (f.cause() == null) {
biConsumer.accept(null, null);
} else {
ExceptionsHelper.dieOnError(f.cause());
biConsumer.accept(null, f.cause());
}
});
return new NettyListener(channelPromise);
}
}
public static NettyListener fromChannelPromise(ChannelPromise channelPromise) {
if (channelPromise instanceof NettyListener) {
return (NettyListener) channelPromise;
} else {
return new NettyListener(channelPromise);
}
}
} }

View File

@ -52,20 +52,23 @@ import java.util.EnumMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.function.BiConsumer;
public class NioHttpChannel extends AbstractRestChannel { public class NioHttpChannel extends AbstractRestChannel {
private final BigArrays bigArrays; private final BigArrays bigArrays;
private final int sequence;
private final ThreadContext threadContext; private final ThreadContext threadContext;
private final FullHttpRequest nettyRequest; private final FullHttpRequest nettyRequest;
private final NioSocketChannel nioChannel; private final NioSocketChannel nioChannel;
private final boolean resetCookies; private final boolean resetCookies;
NioHttpChannel(NioSocketChannel nioChannel, BigArrays bigArrays, NioHttpRequest request, NioHttpChannel(NioSocketChannel nioChannel, BigArrays bigArrays, NioHttpRequest request, int sequence,
HttpHandlingSettings settings, ThreadContext threadContext) { HttpHandlingSettings settings, ThreadContext threadContext) {
super(request, settings.getDetailedErrorsEnabled()); super(request, settings.getDetailedErrorsEnabled());
this.nioChannel = nioChannel; this.nioChannel = nioChannel;
this.bigArrays = bigArrays; this.bigArrays = bigArrays;
this.sequence = sequence;
this.threadContext = threadContext; this.threadContext = threadContext;
this.nettyRequest = request.getRequest(); this.nettyRequest = request.getRequest();
this.resetCookies = settings.isResetCookies(); this.resetCookies = settings.isResetCookies();
@ -117,9 +120,8 @@ public class NioHttpChannel extends AbstractRestChannel {
toClose.add(nioChannel::close); toClose.add(nioChannel::close);
} }
nioChannel.getContext().sendMessage(resp, (aVoid, throwable) -> { BiConsumer<Void, Throwable> listener = (aVoid, throwable) -> Releasables.close(toClose);
Releasables.close(toClose); nioChannel.getContext().sendMessage(new NioHttpResponse(sequence, resp), listener);
});
success = true; success = true;
} finally { } finally {
if (success == false) { if (success == false) {

View File

@ -0,0 +1,103 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.nio;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.LastHttpContent;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.HttpPipeliningAggregator;
import org.elasticsearch.http.nio.NettyListener;
import org.elasticsearch.http.nio.NioHttpResponse;
import java.nio.channels.ClosedChannelException;
import java.util.List;
/**
* Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests.
*/
public class NioHttpPipeliningHandler extends ChannelDuplexHandler {
private final Logger logger;
private final HttpPipeliningAggregator<NioHttpResponse, NettyListener> aggregator;
/**
* Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
*
* @param logger for logging unexpected errors
* @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is
* required as events cannot queue up indefinitely
*/
public NioHttpPipeliningHandler(Logger logger, final int maxEventsHeld) {
this.logger = logger;
this.aggregator = new HttpPipeliningAggregator<>(maxEventsHeld);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof LastHttpContent) {
HttpPipelinedRequest<LastHttpContent> pipelinedRequest = aggregator.read(((LastHttpContent) msg).retain());
ctx.fireChannelRead(pipelinedRequest);
} else {
ctx.fireChannelRead(msg);
}
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
assert msg instanceof NioHttpResponse : "Message must be type: " + NioHttpResponse.class;
NioHttpResponse response = (NioHttpResponse) msg;
boolean success = false;
try {
NettyListener listener = NettyListener.fromChannelPromise(promise);
List<Tuple<NioHttpResponse, NettyListener>> readyResponses = aggregator.write(response, listener);
success = true;
for (Tuple<NioHttpResponse, NettyListener> responseToWrite : readyResponses) {
ctx.write(responseToWrite.v1().getResponse(), responseToWrite.v2());
}
} catch (IllegalStateException e) {
ctx.channel().close();
} finally {
if (success == false) {
promise.setFailure(new ClosedChannelException());
}
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
List<Tuple<NioHttpResponse, NettyListener>> inflightResponses = aggregator.removeAllInflightResponses();
if (inflightResponses.isEmpty() == false) {
ClosedChannelException closedChannelException = new ClosedChannelException();
for (Tuple<NioHttpResponse, NettyListener> inflightResponse : inflightResponses) {
try {
inflightResponse.v2().setFailure(closedChannelException);
} catch (RuntimeException e) {
logger.error("unexpected error while releasing pipelined http responses", e);
}
}
}
ctx.close(promise);
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.nio;
import io.netty.handler.codec.http.FullHttpResponse;
import org.elasticsearch.http.HttpPipelinedMessage;
public class NioHttpResponse extends HttpPipelinedMessage {
private final FullHttpResponse response;
public NioHttpResponse(int sequence, FullHttpResponse response) {
super(sequence);
this.response = response;
}
public FullHttpResponse getResponse() {
return response;
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.http.nio; package org.elasticsearch.http.nio;
import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.ReadTimeoutException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
@ -84,6 +85,7 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_D
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
public class NioHttpServerTransport extends AbstractHttpServerTransport { public class NioHttpServerTransport extends AbstractHttpServerTransport {
@ -124,6 +126,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
ByteSizeValue maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings); ByteSizeValue maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
int pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
this.httpHandlingSettings = new HttpHandlingSettings(Math.toIntExact(maxContentLength.getBytes()), this.httpHandlingSettings = new HttpHandlingSettings(Math.toIntExact(maxContentLength.getBytes()),
Math.toIntExact(maxChunkSize.getBytes()), Math.toIntExact(maxChunkSize.getBytes()),
Math.toIntExact(maxHeaderSize.getBytes()), Math.toIntExact(maxHeaderSize.getBytes()),
@ -131,7 +134,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
SETTING_HTTP_RESET_COOKIES.get(settings), SETTING_HTTP_RESET_COOKIES.get(settings),
SETTING_HTTP_COMPRESSION.get(settings), SETTING_HTTP_COMPRESSION.get(settings),
SETTING_HTTP_COMPRESSION_LEVEL.get(settings), SETTING_HTTP_COMPRESSION_LEVEL.get(settings),
SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings)); SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings),
pipeliningMaxEvents);
this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings); this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings);
this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings); this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings);
@ -140,14 +144,19 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
this.tcpReceiveBufferSize = Math.toIntExact(SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings).getBytes()); this.tcpReceiveBufferSize = Math.toIntExact(SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings).getBytes());
logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}]", logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}]," +
maxChunkSize, maxHeaderSize, maxInitialLineLength, maxContentLength); " pipelining_max_events[{}]",
maxChunkSize, maxHeaderSize, maxInitialLineLength, maxContentLength, pipeliningMaxEvents);
} }
BigArrays getBigArrays() { BigArrays getBigArrays() {
return bigArrays; return bigArrays;
} }
public Logger getLogger() {
return logger;
}
@Override @Override
protected void doStart() { protected void doStart() {
boolean success = false; boolean success = false;

View File

@ -20,6 +20,7 @@ package org.elasticsearch;
import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.nio.NioHttpServerTransport;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.nio.NioTransport; import org.elasticsearch.transport.nio.NioTransport;
@ -43,11 +44,13 @@ public abstract class NioIntegTestCase extends ESIntegTestCase {
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
// randomize netty settings // randomize nio settings
if (randomBoolean()) { if (randomBoolean()) {
builder.put(NioTransport.NIO_WORKER_COUNT.getKey(), random().nextInt(3) + 1); builder.put(NioTransport.NIO_WORKER_COUNT.getKey(), random().nextInt(3) + 1);
builder.put(NioHttpServerTransport.NIO_HTTP_WORKER_COUNT.getKey(), random().nextInt(3) + 1);
} }
builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NioTransportPlugin.NIO_TRANSPORT_NAME); builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NioTransportPlugin.NIO_TRANSPORT_NAME);
builder.put(NetworkModule.HTTP_TYPE_KEY, NioTransportPlugin.NIO_HTTP_TRANSPORT_NAME);
return builder.build(); return builder.build();
} }

View File

@ -61,11 +61,11 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUN
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
public class HttpReadWriteHandlerTests extends ESTestCase { public class HttpReadWriteHandlerTests extends ESTestCase {
@ -91,7 +91,8 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
SETTING_HTTP_RESET_COOKIES.getDefault(settings), SETTING_HTTP_RESET_COOKIES.getDefault(settings),
SETTING_HTTP_COMPRESSION.getDefault(settings), SETTING_HTTP_COMPRESSION.getDefault(settings),
SETTING_HTTP_COMPRESSION_LEVEL.getDefault(settings), SETTING_HTTP_COMPRESSION_LEVEL.getDefault(settings),
SETTING_HTTP_DETAILED_ERRORS_ENABLED.getDefault(settings)); SETTING_HTTP_DETAILED_ERRORS_ENABLED.getDefault(settings),
SETTING_PIPELINING_MAX_EVENTS.getDefault(settings));
ThreadContext threadContext = new ThreadContext(settings); ThreadContext threadContext = new ThreadContext(settings);
nioSocketChannel = mock(NioSocketChannel.class); nioSocketChannel = mock(NioSocketChannel.class);
handler = new HttpReadWriteHandler(nioSocketChannel, transport, httpHandlingSettings, NamedXContentRegistry.EMPTY, threadContext); handler = new HttpReadWriteHandler(nioSocketChannel, transport, httpHandlingSettings, NamedXContentRegistry.EMPTY, threadContext);
@ -148,7 +149,8 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
handler.consumeReads(toChannelBuffer(buf)); handler.consumeReads(toChannelBuffer(buf));
verifyZeroInteractions(transport); verify(transport, times(0)).dispatchBadRequest(any(), any(), any());
verify(transport, times(0)).dispatchRequest(any(), any());
List<FlushOperation> flushOperations = handler.pollFlushOperations(); List<FlushOperation> flushOperations = handler.pollFlushOperations();
assertFalse(flushOperations.isEmpty()); assertFalse(flushOperations.isEmpty());
@ -169,9 +171,10 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
prepareHandlerForResponse(handler); prepareHandlerForResponse(handler);
FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
NioHttpResponse pipelinedResponse = new NioHttpResponse(0, fullHttpResponse);
SocketChannelContext context = mock(SocketChannelContext.class); SocketChannelContext context = mock(SocketChannelContext.class);
HttpWriteOperation writeOperation = new HttpWriteOperation(context, fullHttpResponse, mock(BiConsumer.class)); HttpWriteOperation writeOperation = new HttpWriteOperation(context, pipelinedResponse, mock(BiConsumer.class));
List<FlushOperation> flushOperations = handler.writeToBytes(writeOperation); List<FlushOperation> flushOperations = handler.writeToBytes(writeOperation);
HttpResponse response = responseDecoder.decode(Unpooled.wrappedBuffer(flushOperations.get(0).getBuffersToWrite())); HttpResponse response = responseDecoder.decode(Unpooled.wrappedBuffer(flushOperations.get(0).getBuffersToWrite()));

View File

@ -0,0 +1,304 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.hamcrest.core.Is.is;
public class NioHttpPipeliningHandlerTests extends ESTestCase {
private final ExecutorService handlerService = Executors.newFixedThreadPool(randomIntBetween(4, 8));
private final ExecutorService eventLoopService = Executors.newFixedThreadPool(1);
private final Map<String, CountDownLatch> waitingRequests = new ConcurrentHashMap<>();
private final Map<String, CountDownLatch> finishingRequests = new ConcurrentHashMap<>();
@After
public void cleanup() throws Exception {
waitingRequests.keySet().forEach(this::finishRequest);
shutdownExecutorService();
}
private CountDownLatch finishRequest(String url) {
waitingRequests.get(url).countDown();
return finishingRequests.get(url);
}
private void shutdownExecutorService() throws InterruptedException {
if (!handlerService.isShutdown()) {
handlerService.shutdown();
handlerService.awaitTermination(10, TimeUnit.SECONDS);
}
if (!eventLoopService.isShutdown()) {
eventLoopService.shutdown();
eventLoopService.awaitTermination(10, TimeUnit.SECONDS);
}
}
public void testThatPipeliningWorksWithFastSerializedRequests() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) {
embeddedChannel.writeInbound(createHttpRequest("/" + String.valueOf(i)));
}
final List<CountDownLatch> latches = new ArrayList<>();
for (final String url : waitingRequests.keySet()) {
latches.add(finishRequest(url));
}
for (final CountDownLatch latch : latches) {
latch.await();
}
embeddedChannel.flush();
for (int i = 0; i < numberOfRequests; i++) {
assertReadHttpMessageHasContent(embeddedChannel, String.valueOf(i));
}
assertTrue(embeddedChannel.isOpen());
}
public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) {
embeddedChannel.writeInbound(createHttpRequest("/" + String.valueOf(i)));
}
// random order execution
final List<String> urls = new ArrayList<>(waitingRequests.keySet());
Randomness.shuffle(urls);
final List<CountDownLatch> latches = new ArrayList<>();
for (final String url : urls) {
latches.add(finishRequest(url));
}
for (final CountDownLatch latch : latches) {
latch.await();
}
embeddedChannel.flush();
for (int i = 0; i < numberOfRequests; i++) {
assertReadHttpMessageHasContent(embeddedChannel, String.valueOf(i));
}
assertTrue(embeddedChannel.isOpen());
}
public void testThatPipeliningWorksWithChunkedRequests() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel =
new EmbeddedChannel(
new AggregateUrisAndHeadersHandler(),
new NioHttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) {
final DefaultHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/" + i);
embeddedChannel.writeInbound(request);
embeddedChannel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT);
}
final List<CountDownLatch> latches = new ArrayList<>();
for (int i = numberOfRequests - 1; i >= 0; i--) {
latches.add(finishRequest(Integer.toString(i)));
}
for (final CountDownLatch latch : latches) {
latch.await();
}
embeddedChannel.flush();
for (int i = 0; i < numberOfRequests; i++) {
assertReadHttpMessageHasContent(embeddedChannel, Integer.toString(i));
}
assertTrue(embeddedChannel.isOpen());
}
public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());
for (int i = 0; i < 1 + numberOfRequests + 1; i++) {
embeddedChannel.writeInbound(createHttpRequest("/" + Integer.toString(i)));
}
final List<CountDownLatch> latches = new ArrayList<>();
final List<Integer> requests = IntStream.range(1, numberOfRequests + 1).boxed().collect(Collectors.toList());
Randomness.shuffle(requests);
for (final Integer request : requests) {
latches.add(finishRequest(request.toString()));
}
for (final CountDownLatch latch : latches) {
latch.await();
}
finishRequest(Integer.toString(numberOfRequests + 1)).await();
embeddedChannel.flush();
assertFalse(embeddedChannel.isOpen());
}
public void testPipeliningRequestsAreReleased() throws InterruptedException {
final int numberOfRequests = 10;
final EmbeddedChannel embeddedChannel =
new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests + 1));
for (int i = 0; i < numberOfRequests; i++) {
embeddedChannel.writeInbound(createHttpRequest("/" + i));
}
HttpPipelinedRequest<FullHttpRequest> inbound;
ArrayList<HttpPipelinedRequest<FullHttpRequest>> requests = new ArrayList<>();
while ((inbound = embeddedChannel.readInbound()) != null) {
requests.add(inbound);
}
ArrayList<ChannelPromise> promises = new ArrayList<>();
for (int i = 1; i < requests.size(); ++i) {
final FullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK);
ChannelPromise promise = embeddedChannel.newPromise();
promises.add(promise);
int sequence = requests.get(i).getSequence();
NioHttpResponse resp = new NioHttpResponse(sequence, httpResponse);
embeddedChannel.writeAndFlush(resp, promise);
}
for (ChannelPromise promise : promises) {
assertFalse(promise.isDone());
}
embeddedChannel.close().syncUninterruptibly();
for (ChannelPromise promise : promises) {
assertTrue(promise.isDone());
assertTrue(promise.cause() instanceof ClosedChannelException);
}
}
private void assertReadHttpMessageHasContent(EmbeddedChannel embeddedChannel, String expectedContent) {
FullHttpResponse response = (FullHttpResponse) embeddedChannel.outboundMessages().poll();
assertNotNull("Expected response to exist, maybe you did not wait long enough?", response);
assertNotNull("Expected response to have content " + expectedContent, response.content());
String data = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8);
assertThat(data, is(expectedContent));
}
private FullHttpRequest createHttpRequest(String uri) {
return new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri);
}
private static class AggregateUrisAndHeadersHandler extends SimpleChannelInboundHandler<HttpRequest> {
static final Queue<String> QUEUE_URI = new LinkedTransferQueue<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception {
QUEUE_URI.add(request.uri());
}
}
private class WorkEmulatorHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<LastHttpContent>> {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest<LastHttpContent> pipelinedRequest) {
LastHttpContent request = pipelinedRequest.getRequest();
final QueryStringDecoder decoder;
if (request instanceof FullHttpRequest) {
decoder = new QueryStringDecoder(((FullHttpRequest)request).uri());
} else {
decoder = new QueryStringDecoder(AggregateUrisAndHeadersHandler.QUEUE_URI.poll());
}
final String uri = decoder.path().replace("/", "");
final ByteBuf content = Unpooled.copiedBuffer(uri, StandardCharsets.UTF_8);
final DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK, content);
httpResponse.headers().add(CONTENT_LENGTH, content.readableBytes());
final CountDownLatch waitingLatch = new CountDownLatch(1);
waitingRequests.put(uri, waitingLatch);
final CountDownLatch finishingLatch = new CountDownLatch(1);
finishingRequests.put(uri, finishingLatch);
handlerService.submit(() -> {
try {
waitingLatch.await(1000, TimeUnit.SECONDS);
final ChannelPromise promise = ctx.newPromise();
eventLoopService.submit(() -> {
ctx.write(new NioHttpResponse(pipelinedRequest.getSequence(), httpResponse), promise);
finishingLatch.countDown();
});
} catch (InterruptedException e) {
fail(e.toString());
}
});
}
}
}

View File

@ -16,65 +16,53 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.elasticsearch.http.netty4;
package org.elasticsearch.http.nio;
import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse;
import org.elasticsearch.ESNetty4IntegTestCase; import org.elasticsearch.NioIntegTestCase;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.ESIntegTestCase.Scope;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1) @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
public class Netty4PipeliningDisabledIT extends ESNetty4IntegTestCase { public class NioPipeliningIT extends NioIntegTestCase {
@Override @Override
protected boolean addMockHttpTransport() { protected boolean addMockHttpTransport() {
return false; // enable http return false; // enable http
} }
@Override public void testThatNioHttpServerSupportsPipelining() throws Exception {
protected Settings nodeSettings(int nodeOrdinal) { String[] requests = new String[]{"/", "/_nodes/stats", "/", "/_cluster/state", "/"};
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("http.pipelining", false)
.build();
}
public void testThatNettyHttpServerDoesNotSupportPipelining() throws Exception {
ensureGreen();
String[] requests = new String[] {"/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/"};
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses(); TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = (TransportAddress) randomFrom(boundAddresses); TransportAddress transportAddress = randomFrom(boundAddresses);
try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) { try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests); Collection<FullHttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests);
assertThat(responses, hasSize(requests.length)); assertThat(responses, hasSize(5));
List<String> opaqueIds = new ArrayList<>(Netty4HttpClient.returnOpaqueIds(responses)); Collection<String> opaqueIds = Netty4HttpClient.returnOpaqueIds(responses);
assertOpaqueIdsInOrder(opaqueIds);
assertResponsesOutOfOrder(opaqueIds);
} }
} }
/** private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
* checks if all responses are there, but also tests that they are out of order because pipelining is disabled // check if opaque ids are monotonically increasing
*/ int i = 0;
private void assertResponsesOutOfOrder(List<String> opaqueIds) { String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be monotonically increasing, got [%s]", opaqueIds);
String message = String.format(Locale.ROOT, "Expected returned http message ids to be in any order of: %s", opaqueIds); for (String opaqueId : opaqueIds) {
assertThat(message, opaqueIds, containsInAnyOrder("0", "1", "2", "3", "4", "5", "6")); assertThat(msg, opaqueId, is(String.valueOf(i++)));
}
} }
} }

View File

@ -227,7 +227,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
HttpTransportSettings.SETTING_CORS_ENABLED, HttpTransportSettings.SETTING_CORS_ENABLED,
HttpTransportSettings.SETTING_CORS_MAX_AGE, HttpTransportSettings.SETTING_CORS_MAX_AGE,
HttpTransportSettings.SETTING_HTTP_DETAILED_ERRORS_ENABLED, HttpTransportSettings.SETTING_HTTP_DETAILED_ERRORS_ENABLED,
HttpTransportSettings.SETTING_PIPELINING,
HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN, HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN,
HttpTransportSettings.SETTING_HTTP_HOST, HttpTransportSettings.SETTING_HTTP_HOST,
HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST, HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST,

View File

@ -29,9 +29,11 @@ public class HttpHandlingSettings {
private final boolean compression; private final boolean compression;
private final int compressionLevel; private final int compressionLevel;
private final boolean detailedErrorsEnabled; private final boolean detailedErrorsEnabled;
private final int pipeliningMaxEvents;
public HttpHandlingSettings(int maxContentLength, int maxChunkSize, int maxHeaderSize, int maxInitialLineLength, public HttpHandlingSettings(int maxContentLength, int maxChunkSize, int maxHeaderSize, int maxInitialLineLength,
boolean resetCookies, boolean compression, int compressionLevel, boolean detailedErrorsEnabled) { boolean resetCookies, boolean compression, int compressionLevel, boolean detailedErrorsEnabled,
int pipeliningMaxEvents) {
this.maxContentLength = maxContentLength; this.maxContentLength = maxContentLength;
this.maxChunkSize = maxChunkSize; this.maxChunkSize = maxChunkSize;
this.maxHeaderSize = maxHeaderSize; this.maxHeaderSize = maxHeaderSize;
@ -40,6 +42,7 @@ public class HttpHandlingSettings {
this.compression = compression; this.compression = compression;
this.compressionLevel = compressionLevel; this.compressionLevel = compressionLevel;
this.detailedErrorsEnabled = detailedErrorsEnabled; this.detailedErrorsEnabled = detailedErrorsEnabled;
this.pipeliningMaxEvents = pipeliningMaxEvents;
} }
public int getMaxContentLength() { public int getMaxContentLength() {
@ -73,4 +76,8 @@ public class HttpHandlingSettings {
public boolean getDetailedErrorsEnabled() { public boolean getDetailedErrorsEnabled() {
return detailedErrorsEnabled; return detailedErrorsEnabled;
} }
public int getPipeliningMaxEvents() {
return pipeliningMaxEvents;
}
} }

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
public class HttpPipelinedMessage implements Comparable<HttpPipelinedMessage> {
private final int sequence;
public HttpPipelinedMessage(int sequence) {
this.sequence = sequence;
}
public int getSequence() {
return sequence;
}
@Override
public int compareTo(HttpPipelinedMessage o) {
return Integer.compare(sequence, o.sequence);
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
public class HttpPipelinedRequest<R> extends HttpPipelinedMessage {
private final R request;
HttpPipelinedRequest(int sequence, R request) {
super(sequence);
this.request = request;
}
public R getRequest() {
return request;
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
import org.elasticsearch.common.collect.Tuple;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
public class HttpPipeliningAggregator<Response extends HttpPipelinedMessage, Listener> {
private final int maxEventsHeld;
private final PriorityQueue<Tuple<Response, Listener>> outboundHoldingQueue;
/*
* The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
* channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the
* current write sequence, implying that all preceding messages have been written.
*/
private int readSequence;
private int writeSequence;
public HttpPipeliningAggregator(int maxEventsHeld) {
this.maxEventsHeld = maxEventsHeld;
this.outboundHoldingQueue = new PriorityQueue<>(1, Comparator.comparing(Tuple::v1));
}
public <Request> HttpPipelinedRequest<Request> read(final Request request) {
return new HttpPipelinedRequest<>(readSequence++, request);
}
public List<Tuple<Response, Listener>> write(final Response response, Listener listener) {
if (outboundHoldingQueue.size() < maxEventsHeld) {
ArrayList<Tuple<Response, Listener>> readyResponses = new ArrayList<>();
outboundHoldingQueue.add(new Tuple<>(response, listener));
while (!outboundHoldingQueue.isEmpty()) {
/*
* Since the response with the lowest sequence number is the top of the priority queue, we know if its sequence
* number does not match the current write sequence number then we have not processed all preceding responses yet.
*/
final Tuple<Response, Listener> top = outboundHoldingQueue.peek();
if (top.v1().getSequence() != writeSequence) {
break;
}
outboundHoldingQueue.poll();
readyResponses.add(top);
writeSequence++;
}
return readyResponses;
} else {
int eventCount = outboundHoldingQueue.size() + 1;
throw new IllegalStateException("Too many pipelined events [" + eventCount + "]. Max events allowed ["
+ maxEventsHeld + "].");
}
}
public List<Tuple<Response, Listener>> removeAllInflightResponses() {
ArrayList<Tuple<Response, Listener>> responses = new ArrayList<>(outboundHoldingQueue);
outboundHoldingQueue.clear();
return responses;
}
}

View File

@ -49,8 +49,6 @@ public final class HttpTransportSettings {
new Setting<>("http.cors.allow-headers", "X-Requested-With,Content-Type,Content-Length", (value) -> value, Property.NodeScope); new Setting<>("http.cors.allow-headers", "X-Requested-With,Content-Type,Content-Length", (value) -> value, Property.NodeScope);
public static final Setting<Boolean> SETTING_CORS_ALLOW_CREDENTIALS = public static final Setting<Boolean> SETTING_CORS_ALLOW_CREDENTIALS =
Setting.boolSetting("http.cors.allow-credentials", false, Property.NodeScope); Setting.boolSetting("http.cors.allow-credentials", false, Property.NodeScope);
public static final Setting<Boolean> SETTING_PIPELINING =
Setting.boolSetting("http.pipelining", true, Property.NodeScope);
public static final Setting<Integer> SETTING_PIPELINING_MAX_EVENTS = public static final Setting<Integer> SETTING_PIPELINING_MAX_EVENTS =
Setting.intSetting("http.pipelining.max_events", 10000, Property.NodeScope); Setting.intSetting("http.pipelining.max_events", 10000, Property.NodeScope);
public static final Setting<Boolean> SETTING_HTTP_COMPRESSION = public static final Setting<Boolean> SETTING_HTTP_COMPRESSION =

View File

@ -300,7 +300,6 @@ public final class InternalTestCluster extends TestCluster {
builder.put(Environment.PATH_REPO_SETTING.getKey(), baseDir.resolve("repos")); builder.put(Environment.PATH_REPO_SETTING.getKey(), baseDir.resolve("repos"));
builder.put(TcpTransport.PORT.getKey(), 0); builder.put(TcpTransport.PORT.getKey(), 0);
builder.put("http.port", 0); builder.put("http.port", 0);
builder.put("http.pipelining", enableHttpPipelining);
if (Strings.hasLength(System.getProperty("tests.es.logger.level"))) { if (Strings.hasLength(System.getProperty("tests.es.logger.level"))) {
builder.put("logger.level", System.getProperty("tests.es.logger.level")); builder.put("logger.level", System.getProperty("tests.es.logger.level"));
} }