From 4fd0a3e4926a1ed48f63030d9d54c3eeddbbd662 Mon Sep 17 00:00:00 2001 From: Colin Goodheart-Smithe Date: Wed, 23 May 2018 17:54:46 +0100 Subject: [PATCH] Revert "Make http pipelining support mandatory (#30695)" (#30813) This reverts commit 31251c9 introduced in #30695. We suspect this commit is causing the OOME's reported in #30811 and we will use this PR to test this assertion. --- .../migration/migrate_7_0/settings.asciidoc | 10 +- docs/reference/modules/http.asciidoc | 2 + .../http/netty4/Netty4HttpChannel.java | 39 ++- .../netty4/Netty4HttpPipeliningHandler.java | 102 ------ .../http/netty4/Netty4HttpRequestHandler.java | 30 +- .../http/netty4/Netty4HttpResponse.java | 37 --- .../netty4/Netty4HttpServerTransport.java | 20 +- .../pipelining/HttpPipelinedRequest.java | 88 +++++ .../pipelining/HttpPipelinedResponse.java | 94 ++++++ .../pipelining/HttpPipeliningHandler.java | 144 +++++++++ .../http/netty4/Netty4HttpChannelTests.java | 19 +- .../Netty4HttpServerPipeliningTests.java | 82 ++++- .../netty4/Netty4PipeliningDisabledIT.java | 48 +-- ...IT.java => Netty4PipeliningEnabledIT.java} | 12 +- .../Netty4HttpPipeliningHandlerTests.java | 57 ++-- .../http/nio/HttpReadWriteHandler.java | 145 ++++----- .../http/nio/HttpWriteOperation.java | 7 +- .../elasticsearch/http/nio/NettyAdaptor.java | 20 +- .../elasticsearch/http/nio/NettyListener.java | 30 +- .../http/nio/NioHttpChannel.java | 10 +- .../http/nio/NioHttpPipeliningHandler.java | 103 ------ .../http/nio/NioHttpResponse.java | 37 --- .../http/nio/NioHttpServerTransport.java | 15 +- .../org/elasticsearch/NioIntegTestCase.java | 5 +- .../http/nio/HttpReadWriteHandlerTests.java | 11 +- .../nio/NioHttpPipeliningHandlerTests.java | 304 ------------------ .../common/settings/ClusterSettings.java | 1 + .../http/HttpHandlingSettings.java | 9 +- .../http/HttpPipelinedMessage.java | 37 --- .../http/HttpPipelinedRequest.java | 33 -- .../http/HttpPipeliningAggregator.java | 81 ----- .../http/HttpTransportSettings.java | 2 + .../single/SingleNodeDiscoveryIT.java | 1 + .../elasticsearch/test/ESIntegTestCase.java | 2 +- .../test/InternalTestCluster.java | 5 +- .../test/test/InternalTestClusterTests.java | 23 +- .../audit/index/IndexAuditTrailTests.java | 2 +- .../RemoteIndexAuditTrailStartingTests.java | 2 +- 38 files changed, 668 insertions(+), 1001 deletions(-) delete mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java delete mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpResponse.java create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipelinedRequest.java create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipelinedResponse.java create mode 100644 modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipeliningHandler.java rename plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioPipeliningIT.java => modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4PipeliningDisabledIT.java (53%) rename modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/{Netty4PipeliningIT.java => Netty4PipeliningEnabledIT.java} (87%) rename modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/{ => pipelining}/Netty4HttpPipeliningHandlerTests.java (83%) delete mode 100644 plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java delete mode 100644 plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpResponse.java delete mode 100644 plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java delete mode 100644 server/src/main/java/org/elasticsearch/http/HttpPipelinedMessage.java delete mode 100644 server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java delete mode 100644 server/src/main/java/org/elasticsearch/http/HttpPipeliningAggregator.java diff --git a/docs/reference/migration/migrate_7_0/settings.asciidoc b/docs/reference/migration/migrate_7_0/settings.asciidoc index 7826afc05fa..d62d7e6065d 100644 --- a/docs/reference/migration/migrate_7_0/settings.asciidoc +++ b/docs/reference/migration/migrate_7_0/settings.asciidoc @@ -29,14 +29,6 @@ [[remove-http-enabled]] ==== Http enabled setting removed -* The setting `http.enabled` previously allowed disabling binding to HTTP, only allowing +The setting `http.enabled` previously allowed disabling binding to HTTP, only allowing use of the transport client. This setting has been removed, as the transport client will be removed in the future, thus requiring HTTP to always be enabled. - -[[remove-http-pipelining-setting]] -==== Http pipelining setting removed - -* The setting `http.pipelining` previously allowed disabling HTTP pipelining support. -This setting has been removed, as disabling http pipelining support on the server -provided little value. The setting `http.pipelining.max_events` can still be used to -limit the number of pipelined requests in-flight. diff --git a/docs/reference/modules/http.asciidoc b/docs/reference/modules/http.asciidoc index dab8e813689..7f29a9db7f6 100644 --- a/docs/reference/modules/http.asciidoc +++ b/docs/reference/modules/http.asciidoc @@ -96,6 +96,8 @@ and stack traces in response output. Note: When set to `false` and the `error_tr parameter is specified, an error will be returned; when `error_trace` is not specified, a simple message will be returned. Defaults to `true` +|`http.pipelining` |Enable or disable HTTP pipelining, defaults to `true`. + |`http.pipelining.max_events` |The maximum number of events to be queued up in memory before a HTTP connection is closed, defaults to `10000`. |`http.max_warning_header_count` |The maximum number of warning headers in diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java index cb31d444544..6e39a7f50d2 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; +import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest; import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; @@ -58,24 +59,29 @@ final class Netty4HttpChannel extends AbstractRestChannel { private final Netty4HttpServerTransport transport; private final Channel channel; private final FullHttpRequest nettyRequest; - private final int sequence; + private final HttpPipelinedRequest pipelinedRequest; private final ThreadContext threadContext; private final HttpHandlingSettings handlingSettings; /** - * @param transport The corresponding NettyHttpServerTransport where this channel belongs to. - * @param request The request that is handled by this channel. - * @param sequence The pipelining sequence number for this request - * @param handlingSettings true if error messages should include stack traces. - * @param threadContext the thread context for the channel + * @param transport The corresponding NettyHttpServerTransport where this channel belongs to. + * @param request The request that is handled by this channel. + * @param pipelinedRequest If HTTP pipelining is enabled provide the corresponding pipelined request. May be null if + * HTTP pipelining is disabled. + * @param handlingSettings true iff error messages should include stack traces. + * @param threadContext the thread context for the channel */ - Netty4HttpChannel(Netty4HttpServerTransport transport, Netty4HttpRequest request, int sequence, HttpHandlingSettings handlingSettings, - ThreadContext threadContext) { + Netty4HttpChannel( + final Netty4HttpServerTransport transport, + final Netty4HttpRequest request, + final HttpPipelinedRequest pipelinedRequest, + final HttpHandlingSettings handlingSettings, + final ThreadContext threadContext) { super(request, handlingSettings.getDetailedErrorsEnabled()); this.transport = transport; this.channel = request.getChannel(); this.nettyRequest = request.request(); - this.sequence = sequence; + this.pipelinedRequest = pipelinedRequest; this.threadContext = threadContext; this.handlingSettings = handlingSettings; } @@ -123,7 +129,7 @@ final class Netty4HttpChannel extends AbstractRestChannel { final ChannelPromise promise = channel.newPromise(); if (releaseContent) { - promise.addListener(f -> ((Releasable) content).close()); + promise.addListener(f -> ((Releasable)content).close()); } if (releaseBytesStreamOutput) { @@ -134,9 +140,13 @@ final class Netty4HttpChannel extends AbstractRestChannel { promise.addListener(ChannelFutureListener.CLOSE); } - Netty4HttpResponse newResponse = new Netty4HttpResponse(sequence, resp); - - channel.writeAndFlush(newResponse, promise); + final Object msg; + if (pipelinedRequest != null) { + msg = pipelinedRequest.createHttpResponse(resp, promise); + } else { + msg = resp; + } + channel.writeAndFlush(msg, promise); releaseContent = false; releaseBytesStreamOutput = false; } finally { @@ -146,6 +156,9 @@ final class Netty4HttpChannel extends AbstractRestChannel { if (releaseBytesStreamOutput) { bytesOutputOrNull().close(); } + if (pipelinedRequest != null) { + pipelinedRequest.release(); + } } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java deleted file mode 100644 index e930ffe7424..00000000000 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.http.netty4; - -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.LastHttpContent; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.http.HttpPipelinedRequest; -import org.elasticsearch.http.HttpPipeliningAggregator; -import org.elasticsearch.transport.netty4.Netty4Utils; - -import java.nio.channels.ClosedChannelException; -import java.util.Collections; -import java.util.List; - -/** - * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests. - */ -public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler { - - private final Logger logger; - private final HttpPipeliningAggregator aggregator; - - /** - * Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation. - * - * @param logger for logging unexpected errors - * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is - * required as events cannot queue up indefinitely - */ - public Netty4HttpPipeliningHandler(Logger logger, final int maxEventsHeld) { - this.logger = logger; - this.aggregator = new HttpPipeliningAggregator<>(maxEventsHeld); - } - - @Override - public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - if (msg instanceof LastHttpContent) { - HttpPipelinedRequest pipelinedRequest = aggregator.read(((LastHttpContent) msg).retain()); - ctx.fireChannelRead(pipelinedRequest); - } else { - ctx.fireChannelRead(msg); - } - } - - @Override - public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - assert msg instanceof Netty4HttpResponse : "Message must be type: " + Netty4HttpResponse.class; - Netty4HttpResponse response = (Netty4HttpResponse) msg; - boolean success = false; - try { - List> readyResponses = aggregator.write(response, promise); - for (Tuple readyResponse : readyResponses) { - ctx.write(readyResponse.v1().getResponse(), readyResponse.v2()); - } - success = true; - } catch (IllegalStateException e) { - ctx.channel().close(); - } finally { - if (success == false) { - promise.setFailure(new ClosedChannelException()); - } - } - } - - @Override - public void close(ChannelHandlerContext ctx, ChannelPromise promise) { - List> inflightResponses = aggregator.removeAllInflightResponses(); - - if (inflightResponses.isEmpty() == false) { - ClosedChannelException closedChannelException = new ClosedChannelException(); - for (Tuple inflightResponse : inflightResponses) { - try { - inflightResponse.v2().setFailure(closedChannelException); - } catch (RuntimeException e) { - logger.error("unexpected error while releasing pipelined http responses", e); - } - } - } - ctx.close(promise); - } -} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index c3a010226a4..74429c8dda9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -30,30 +30,41 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaders; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.http.HttpHandlingSettings; -import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.transport.netty4.Netty4Utils; import java.util.Collections; @ChannelHandler.Sharable -class Netty4HttpRequestHandler extends SimpleChannelInboundHandler> { +class Netty4HttpRequestHandler extends SimpleChannelInboundHandler { private final Netty4HttpServerTransport serverTransport; private final HttpHandlingSettings handlingSettings; + private final boolean httpPipeliningEnabled; private final ThreadContext threadContext; Netty4HttpRequestHandler(Netty4HttpServerTransport serverTransport, HttpHandlingSettings handlingSettings, ThreadContext threadContext) { this.serverTransport = serverTransport; + this.httpPipeliningEnabled = serverTransport.pipelining; this.handlingSettings = handlingSettings; this.threadContext = threadContext; } @Override - protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest msg) throws Exception { - final FullHttpRequest request = msg.getRequest(); + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + final FullHttpRequest request; + final HttpPipelinedRequest pipelinedRequest; + if (this.httpPipeliningEnabled && msg instanceof HttpPipelinedRequest) { + pipelinedRequest = (HttpPipelinedRequest) msg; + request = (FullHttpRequest) pipelinedRequest.last(); + } else { + pipelinedRequest = null; + request = (FullHttpRequest) msg; + } + boolean success = false; try { final FullHttpRequest copy = @@ -100,7 +111,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler, ReferenceCounted { + + private final FullHttpResponse response; + private final ChannelPromise promise; + private final int sequence; + + HttpPipelinedResponse(FullHttpResponse response, ChannelPromise promise, int sequence) { + this.response = response; + this.promise = promise; + this.sequence = sequence; + } + + public FullHttpResponse response() { + return response; + } + + public ChannelPromise promise() { + return promise; + } + + public int sequence() { + return sequence; + } + + @Override + public int compareTo(HttpPipelinedResponse o) { + return Integer.compare(sequence, o.sequence); + } + + @Override + public int refCnt() { + return response.refCnt(); + } + + @Override + public ReferenceCounted retain() { + response.retain(); + return this; + } + + @Override + public ReferenceCounted retain(int increment) { + response.retain(increment); + return this; + } + + @Override + public ReferenceCounted touch() { + response.touch(); + return this; + } + + @Override + public ReferenceCounted touch(Object hint) { + response.touch(hint); + return this; + } + + @Override + public boolean release() { + return response.release(); + } + + @Override + public boolean release(int decrement) { + return response.release(decrement); + } + +} diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipeliningHandler.java new file mode 100644 index 00000000000..a90027c8148 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipeliningHandler.java @@ -0,0 +1,144 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.http.netty4.pipelining; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.LastHttpContent; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.transport.netty4.Netty4Utils; + +import java.nio.channels.ClosedChannelException; +import java.util.Collections; +import java.util.PriorityQueue; + +/** + * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests. + */ +public class HttpPipeliningHandler extends ChannelDuplexHandler { + + // we use a priority queue so that responses are ordered by their sequence number + private final PriorityQueue holdingQueue; + + private final Logger logger; + private final int maxEventsHeld; + + /* + * The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the + * channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the + * current write sequence, implying that all preceding messages have been written. + */ + private int readSequence; + private int writeSequence; + + /** + * Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation. + * + * @param logger for logging unexpected errors + * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is + * required as events cannot queue up indefinitely + */ + public HttpPipeliningHandler(Logger logger, final int maxEventsHeld) { + this.logger = logger; + this.maxEventsHeld = maxEventsHeld; + this.holdingQueue = new PriorityQueue<>(1); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { + if (msg instanceof LastHttpContent) { + ctx.fireChannelRead(new HttpPipelinedRequest(((LastHttpContent) msg).retain(), readSequence++)); + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception { + if (msg instanceof HttpPipelinedResponse) { + final HttpPipelinedResponse current = (HttpPipelinedResponse) msg; + /* + * We attach the promise to the response. When we invoke a write on the channel with the response, we must ensure that we invoke + * the write methods that accept the same promise that we have attached to the response otherwise as the response proceeds + * through the handler pipeline a different promise will be used until reaching this handler. Therefore, we assert here that the + * attached promise is identical to the provided promise as a safety mechanism that we are respecting this. + */ + assert current.promise() == promise; + + boolean channelShouldClose = false; + + synchronized (holdingQueue) { + if (holdingQueue.size() < maxEventsHeld) { + holdingQueue.add(current); + + while (!holdingQueue.isEmpty()) { + /* + * Since the response with the lowest sequence number is the top of the priority queue, we know if its sequence + * number does not match the current write sequence number then we have not processed all preceding responses yet. + */ + final HttpPipelinedResponse top = holdingQueue.peek(); + if (top.sequence() != writeSequence) { + break; + } + holdingQueue.remove(); + /* + * We must use the promise attached to the response; this is necessary since are going to hold a response until all + * responses that precede it in the pipeline are written first. Note that the promise from the method invocation is + * not ignored, it will already be attached to an existing response and consumed when that response is drained. + */ + ctx.write(top.response(), top.promise()); + writeSequence++; + } + } else { + channelShouldClose = true; + } + } + + if (channelShouldClose) { + try { + Netty4Utils.closeChannels(Collections.singletonList(ctx.channel())); + } finally { + current.release(); + promise.setSuccess(); + } + } + } else { + ctx.write(msg, promise); + } + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + if (holdingQueue.isEmpty() == false) { + ClosedChannelException closedChannelException = new ClosedChannelException(); + HttpPipelinedResponse pipelinedResponse; + while ((pipelinedResponse = holdingQueue.poll()) != null) { + try { + pipelinedResponse.release(); + pipelinedResponse.promise().setFailure(closedChannelException); + } catch (Exception e) { + logger.error("unexpected error while releasing pipelined http responses", e); + } + } + } + ctx.close(promise); + } +} diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java index 7c5b35a3229..0ef1ea585b1 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java @@ -60,6 +60,7 @@ import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.HttpTransportSettings; import org.elasticsearch.http.NullDispatcher; import org.elasticsearch.http.netty4.cors.Netty4CorsHandler; +import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestResponse; @@ -211,12 +212,12 @@ public class Netty4HttpChannelTests extends ESTestCase { final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); httpRequest.headers().add(HttpHeaderNames.ORIGIN, "remote"); final WriteCapturingChannel writeCapturingChannel = new WriteCapturingChannel(); - final Netty4HttpRequest request = new Netty4HttpRequest(xContentRegistry(), httpRequest, writeCapturingChannel); + Netty4HttpRequest request = new Netty4HttpRequest(xContentRegistry(), httpRequest, writeCapturingChannel); HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings; // send a response Netty4HttpChannel channel = - new Netty4HttpChannel(httpServerTransport, request, 1, handlingSettings, threadPool.getThreadContext()); + new Netty4HttpChannel(httpServerTransport, request, null, handlingSettings, threadPool.getThreadContext()); TestResponse resp = new TestResponse(); final String customHeader = "custom-header"; final String customHeaderValue = "xyz"; @@ -226,7 +227,7 @@ public class Netty4HttpChannelTests extends ESTestCase { // inspect what was written List writtenObjects = writeCapturingChannel.getWrittenObjects(); assertThat(writtenObjects.size(), is(1)); - HttpResponse response = ((Netty4HttpResponse) writtenObjects.get(0)).getResponse(); + HttpResponse response = (HttpResponse) writtenObjects.get(0); assertThat(response.headers().get("non-existent-header"), nullValue()); assertThat(response.headers().get(customHeader), equalTo(customHeaderValue)); assertThat(response.headers().get(HttpHeaderNames.CONTENT_LENGTH), equalTo(Integer.toString(resp.content().length()))); @@ -242,9 +243,10 @@ public class Netty4HttpChannelTests extends ESTestCase { final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); final EmbeddedChannel embeddedChannel = new EmbeddedChannel(); final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel); + final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null; HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings; final Netty4HttpChannel channel = - new Netty4HttpChannel(httpServerTransport, request, 1, handlingSettings, threadPool.getThreadContext()); + new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, handlingSettings, threadPool.getThreadContext()); final TestResponse response = new TestResponse(bigArrays); assertThat(response.content(), instanceOf(Releasable.class)); embeddedChannel.close(); @@ -261,9 +263,10 @@ public class Netty4HttpChannelTests extends ESTestCase { final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); final EmbeddedChannel embeddedChannel = new EmbeddedChannel(); final Netty4HttpRequest request = new Netty4HttpRequest(registry, httpRequest, embeddedChannel); + final HttpPipelinedRequest pipelinedRequest = randomBoolean() ? new HttpPipelinedRequest(request.request(), 1) : null; HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings; final Netty4HttpChannel channel = - new Netty4HttpChannel(httpServerTransport, request, 1, handlingSettings, threadPool.getThreadContext()); + new Netty4HttpChannel(httpServerTransport, request, pipelinedRequest, handlingSettings, threadPool.getThreadContext()); final BytesRestResponse response = new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, JsonXContent.contentBuilder().startObject().endObject()); assertThat(response.content(), not(instanceOf(Releasable.class))); @@ -309,7 +312,7 @@ public class Netty4HttpChannelTests extends ESTestCase { assertTrue(embeddedChannel.isOpen()); HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings; final Netty4HttpChannel channel = - new Netty4HttpChannel(httpServerTransport, request, 1, handlingSettings, threadPool.getThreadContext()); + new Netty4HttpChannel(httpServerTransport, request, null, handlingSettings, threadPool.getThreadContext()); final TestResponse resp = new TestResponse(); channel.sendResponse(resp); assertThat(embeddedChannel.isOpen(), equalTo(!close)); @@ -337,13 +340,13 @@ public class Netty4HttpChannelTests extends ESTestCase { HttpHandlingSettings handlingSettings = httpServerTransport.httpHandlingSettings; Netty4HttpChannel channel = - new Netty4HttpChannel(httpServerTransport, request, 1, handlingSettings, threadPool.getThreadContext()); + new Netty4HttpChannel(httpServerTransport, request, null, handlingSettings, threadPool.getThreadContext()); channel.sendResponse(new TestResponse()); // get the response List writtenObjects = writeCapturingChannel.getWrittenObjects(); assertThat(writtenObjects.size(), is(1)); - return ((Netty4HttpResponse) writtenObjects.get(0)).getResponse(); + return (FullHttpResponse) writtenObjects.get(0); } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java index f2b28b90918..0eb14a8a76e 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java @@ -38,9 +38,9 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.http.HttpPipelinedRequest; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.NullDispatcher; +import org.elasticsearch.http.netty4.pipelining.HttpPipelinedRequest; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -52,11 +52,16 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.elasticsearch.test.hamcrest.RegexMatcher.matches; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasSize; /** * This test just tests, if he pipelining works in general with out any connection the Elasticsearch handler @@ -80,8 +85,9 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { } } - public void testThatHttpPipeliningWorks() throws Exception { + public void testThatHttpPipeliningWorksWhenEnabled() throws Exception { final Settings settings = Settings.builder() + .put("http.pipelining", true) .put("http.port", "0") .build(); try (HttpServerTransport httpServerTransport = new CustomNettyHttpServerTransport(settings)) { @@ -106,6 +112,48 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { } } + public void testThatHttpPipeliningCanBeDisabled() throws Exception { + final Settings settings = Settings.builder() + .put("http.pipelining", false) + .put("http.port", "0") + .build(); + try (HttpServerTransport httpServerTransport = new CustomNettyHttpServerTransport(settings)) { + httpServerTransport.start(); + final TransportAddress transportAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()); + + final int numberOfRequests = randomIntBetween(4, 16); + final Set slowIds = new HashSet<>(); + final List requests = new ArrayList<>(numberOfRequests); + for (int i = 0; i < numberOfRequests; i++) { + if (rarely()) { + requests.add("/slow/" + i); + slowIds.add(i); + } else { + requests.add("/" + i); + } + } + + try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) { + Collection responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{})); + List responseBodies = new ArrayList<>(Netty4HttpClient.returnHttpResponseBodies(responses)); + // we can not be sure about the order of the responses, but the slow ones should come last + assertThat(responseBodies, hasSize(numberOfRequests)); + for (int i = 0; i < numberOfRequests - slowIds.size(); i++) { + assertThat(responseBodies.get(i), matches("/\\d+")); + } + + final Set ids = new HashSet<>(); + for (int i = 0; i < slowIds.size(); i++) { + final String response = responseBodies.get(numberOfRequests - slowIds.size() + i); + assertThat(response, matches("/slow/\\d+" )); + assertTrue(ids.add(Integer.parseInt(response.split("/")[2]))); + } + + assertThat(slowIds, equalTo(ids)); + } + } + } + class CustomNettyHttpServerTransport extends Netty4HttpServerTransport { private final ExecutorService executorService = Executors.newCachedThreadPool(); @@ -148,7 +196,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { } - class PossiblySlowUpstreamHandler extends SimpleChannelInboundHandler> { + class PossiblySlowUpstreamHandler extends SimpleChannelInboundHandler { private final ExecutorService executorService; @@ -157,7 +205,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { } @Override - protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest msg) throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { executorService.submit(new PossiblySlowRunnable(ctx, msg)); } @@ -172,18 +220,26 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { class PossiblySlowRunnable implements Runnable { private ChannelHandlerContext ctx; - private HttpPipelinedRequest pipelinedRequest; + private HttpPipelinedRequest pipelinedRequest; private FullHttpRequest fullHttpRequest; - PossiblySlowRunnable(ChannelHandlerContext ctx, HttpPipelinedRequest msg) { + PossiblySlowRunnable(ChannelHandlerContext ctx, Object msg) { this.ctx = ctx; - this.pipelinedRequest = msg; - this.fullHttpRequest = pipelinedRequest.getRequest(); + if (msg instanceof HttpPipelinedRequest) { + this.pipelinedRequest = (HttpPipelinedRequest) msg; + } else if (msg instanceof FullHttpRequest) { + this.fullHttpRequest = (FullHttpRequest) msg; + } } @Override public void run() { - final String uri = fullHttpRequest.uri(); + final String uri; + if (pipelinedRequest != null && pipelinedRequest.last() instanceof FullHttpRequest) { + uri = ((FullHttpRequest) pipelinedRequest.last()).uri(); + } else { + uri = fullHttpRequest.uri(); + } final ByteBuf buffer = Unpooled.copiedBuffer(uri, StandardCharsets.UTF_8); @@ -202,7 +258,13 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { } final ChannelPromise promise = ctx.newPromise(); - ctx.writeAndFlush(new Netty4HttpResponse(pipelinedRequest.getSequence(), httpResponse), promise); + final Object msg; + if (pipelinedRequest != null) { + msg = pipelinedRequest.createHttpResponse(httpResponse, promise); + } else { + msg = httpResponse; + } + ctx.writeAndFlush(msg, promise); } } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioPipeliningIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4PipeliningDisabledIT.java similarity index 53% rename from plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioPipeliningIT.java rename to modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4PipeliningDisabledIT.java index 074aafd6eab..af0e7c85a8f 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioPipeliningIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4PipeliningDisabledIT.java @@ -16,53 +16,65 @@ * specific language governing permissions and limitations * under the License. */ - -package org.elasticsearch.http.nio; +package org.elasticsearch.http.netty4; import io.netty.handler.codec.http.FullHttpResponse; -import org.elasticsearch.NioIntegTestCase; +import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Locale; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.is; @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1) -public class NioPipeliningIT extends NioIntegTestCase { +public class Netty4PipeliningDisabledIT extends ESNetty4IntegTestCase { @Override protected boolean addMockHttpTransport() { return false; // enable http } - public void testThatNioHttpServerSupportsPipelining() throws Exception { - String[] requests = new String[]{"/", "/_nodes/stats", "/", "/_cluster/state", "/"}; + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put("http.pipelining", false) + .build(); + } + + public void testThatNettyHttpServerDoesNotSupportPipelining() throws Exception { + ensureGreen(); + String[] requests = new String[] {"/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/"}; HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses(); - TransportAddress transportAddress = randomFrom(boundAddresses); + TransportAddress transportAddress = (TransportAddress) randomFrom(boundAddresses); try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) { Collection responses = nettyHttpClient.get(transportAddress.address(), requests); - assertThat(responses, hasSize(5)); + assertThat(responses, hasSize(requests.length)); - Collection opaqueIds = Netty4HttpClient.returnOpaqueIds(responses); - assertOpaqueIdsInOrder(opaqueIds); + List opaqueIds = new ArrayList<>(Netty4HttpClient.returnOpaqueIds(responses)); + + assertResponsesOutOfOrder(opaqueIds); } } - private void assertOpaqueIdsInOrder(Collection opaqueIds) { - // check if opaque ids are monotonically increasing - int i = 0; - String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be monotonically increasing, got [%s]", opaqueIds); - for (String opaqueId : opaqueIds) { - assertThat(msg, opaqueId, is(String.valueOf(i++))); - } + /** + * checks if all responses are there, but also tests that they are out of order because pipelining is disabled + */ + private void assertResponsesOutOfOrder(List opaqueIds) { + String message = String.format(Locale.ROOT, "Expected returned http message ids to be in any order of: %s", opaqueIds); + assertThat(message, opaqueIds, containsInAnyOrder("0", "1", "2", "3", "4", "5", "6")); } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4PipeliningEnabledIT.java similarity index 87% rename from modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java rename to modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4PipeliningEnabledIT.java index ebb91d9663e..9723ee93faf 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4PipeliningEnabledIT.java @@ -21,6 +21,8 @@ package org.elasticsearch.http.netty4; import io.netty.handler.codec.http.FullHttpResponse; import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; @@ -33,13 +35,21 @@ import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1) -public class Netty4PipeliningIT extends ESNetty4IntegTestCase { +public class Netty4PipeliningEnabledIT extends ESNetty4IntegTestCase { @Override protected boolean addMockHttpTransport() { return false; // enable http } + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put("http.pipelining", true) + .build(); + } + public void testThatNettyHttpServerSupportsPipelining() throws Exception { String[] requests = new String[]{"/", "/_nodes/stats", "/", "/_cluster/state", "/"}; diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/pipelining/Netty4HttpPipeliningHandlerTests.java similarity index 83% rename from modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java rename to modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/pipelining/Netty4HttpPipeliningHandlerTests.java index 21151304424..ffb6c8fb356 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandlerTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/pipelining/Netty4HttpPipeliningHandlerTests.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.http.netty4; +package org.elasticsearch.http.netty4.pipelining; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; @@ -37,7 +37,6 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.QueryStringDecoder; import org.elasticsearch.common.Randomness; -import org.elasticsearch.http.HttpPipelinedRequest; import org.elasticsearch.test.ESTestCase; import org.junit.After; @@ -63,8 +62,7 @@ import static org.hamcrest.core.Is.is; public class Netty4HttpPipeliningHandlerTests extends ESTestCase { - private final ExecutorService handlerService = Executors.newFixedThreadPool(randomIntBetween(4, 8)); - private final ExecutorService eventLoopService = Executors.newFixedThreadPool(1); + private final ExecutorService executorService = Executors.newFixedThreadPool(randomIntBetween(4, 8)); private final Map waitingRequests = new ConcurrentHashMap<>(); private final Map finishingRequests = new ConcurrentHashMap<>(); @@ -81,19 +79,15 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase { } private void shutdownExecutorService() throws InterruptedException { - if (!handlerService.isShutdown()) { - handlerService.shutdown(); - handlerService.awaitTermination(10, TimeUnit.SECONDS); - } - if (!eventLoopService.isShutdown()) { - eventLoopService.shutdown(); - eventLoopService.awaitTermination(10, TimeUnit.SECONDS); + if (!executorService.isShutdown()) { + executorService.shutdown(); + executorService.awaitTermination(10, TimeUnit.SECONDS); } } public void testThatPipeliningWorksWithFastSerializedRequests() throws InterruptedException { final int numberOfRequests = randomIntBetween(2, 128); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests), + final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests), new WorkEmulatorHandler()); for (int i = 0; i < numberOfRequests; i++) { @@ -120,7 +114,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase { public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws InterruptedException { final int numberOfRequests = randomIntBetween(2, 128); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests), + final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests), new WorkEmulatorHandler()); for (int i = 0; i < numberOfRequests; i++) { @@ -153,7 +147,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase { final EmbeddedChannel embeddedChannel = new EmbeddedChannel( new AggregateUrisAndHeadersHandler(), - new Netty4HttpPipeliningHandler(logger, numberOfRequests), + new HttpPipeliningHandler(logger, numberOfRequests), new WorkEmulatorHandler()); for (int i = 0; i < numberOfRequests; i++) { @@ -182,7 +176,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase { public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException { final int numberOfRequests = randomIntBetween(2, 128); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests), + final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests), new WorkEmulatorHandler()); for (int i = 0; i < 1 + numberOfRequests + 1; i++) { @@ -190,7 +184,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase { } final List latches = new ArrayList<>(); - final List requests = IntStream.range(1, numberOfRequests + 1).boxed().collect(Collectors.toList()); + final List requests = IntStream.range(1, numberOfRequests + 1).mapToObj(r -> r).collect(Collectors.toList()); Randomness.shuffle(requests); for (final Integer request : requests) { @@ -211,26 +205,25 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase { public void testPipeliningRequestsAreReleased() throws InterruptedException { final int numberOfRequests = 10; final EmbeddedChannel embeddedChannel = - new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests + 1)); + new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests + 1)); for (int i = 0; i < numberOfRequests; i++) { embeddedChannel.writeInbound(createHttpRequest("/" + i)); } - HttpPipelinedRequest inbound; - ArrayList> requests = new ArrayList<>(); + HttpPipelinedRequest inbound; + ArrayList requests = new ArrayList<>(); while ((inbound = embeddedChannel.readInbound()) != null) { requests.add(inbound); } ArrayList promises = new ArrayList<>(); for (int i = 1; i < requests.size(); ++i) { - final FullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK); + final DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK); ChannelPromise promise = embeddedChannel.newPromise(); promises.add(promise); - int sequence = requests.get(i).getSequence(); - Netty4HttpResponse resp = new Netty4HttpResponse(sequence, httpResponse); - embeddedChannel.writeAndFlush(resp, promise); + HttpPipelinedResponse response = requests.get(i).createHttpResponse(httpResponse, promise); + embeddedChannel.writeAndFlush(response, promise); } for (ChannelPromise promise : promises) { @@ -267,14 +260,14 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase { } - private class WorkEmulatorHandler extends SimpleChannelInboundHandler> { + private class WorkEmulatorHandler extends SimpleChannelInboundHandler { @Override - protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest pipelinedRequest) { - LastHttpContent request = pipelinedRequest.getRequest(); + protected void channelRead0(final ChannelHandlerContext ctx, final HttpPipelinedRequest pipelinedRequest) throws Exception { final QueryStringDecoder decoder; - if (request instanceof FullHttpRequest) { - decoder = new QueryStringDecoder(((FullHttpRequest)request).uri()); + if (pipelinedRequest.last() instanceof FullHttpRequest) { + final FullHttpRequest fullHttpRequest = (FullHttpRequest) pipelinedRequest.last(); + decoder = new QueryStringDecoder(fullHttpRequest.uri()); } else { decoder = new QueryStringDecoder(AggregateUrisAndHeadersHandler.QUEUE_URI.poll()); } @@ -289,14 +282,12 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase { final CountDownLatch finishingLatch = new CountDownLatch(1); finishingRequests.put(uri, finishingLatch); - handlerService.submit(() -> { + executorService.submit(() -> { try { waitingLatch.await(1000, TimeUnit.SECONDS); final ChannelPromise promise = ctx.newPromise(); - eventLoopService.submit(() -> { - ctx.write(new Netty4HttpResponse(pipelinedRequest.getSequence(), httpResponse), promise); - finishingLatch.countDown(); - }); + ctx.write(pipelinedRequest.createHttpResponse(httpResponse, promise), promise); + finishingLatch.countDown(); } catch (InterruptedException e) { fail(e.toString()); } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java index e3481e3c254..f1d18ddacbd 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpReadWriteHandler.java @@ -25,21 +25,20 @@ import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.HttpHandlingSettings; -import org.elasticsearch.http.HttpPipelinedRequest; import org.elasticsearch.nio.FlushOperation; import org.elasticsearch.nio.InboundChannelBuffer; -import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ReadWriteHandler; +import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.nio.WriteOperation; import org.elasticsearch.rest.RestRequest; @@ -78,7 +77,6 @@ public class HttpReadWriteHandler implements ReadWriteHandler { if (settings.isCompression()) { handlers.add(new HttpContentCompressor(settings.getCompressionLevel())); } - handlers.add(new NioHttpPipeliningHandler(transport.getLogger(), settings.getPipeliningMaxEvents())); adaptor = new NettyAdaptor(handlers.toArray(new ChannelHandler[0])); adaptor.addCloseListener((v, e) -> nioChannel.close()); @@ -97,9 +95,9 @@ public class HttpReadWriteHandler implements ReadWriteHandler { @Override public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer listener) { - assert message instanceof NioHttpResponse : "This channel only supports messages that are of type: " - + NioHttpResponse.class + ". Found type: " + message.getClass() + "."; - return new HttpWriteOperation(context, (NioHttpResponse) message, listener); + assert message instanceof FullHttpResponse : "This channel only supports messages that are of type: " + FullHttpResponse.class + + ". Found type: " + message.getClass() + "."; + return new HttpWriteOperation(context, (FullHttpResponse) message, listener); } @Override @@ -127,85 +125,76 @@ public class HttpReadWriteHandler implements ReadWriteHandler { } } - @SuppressWarnings("unchecked") private void handleRequest(Object msg) { - final HttpPipelinedRequest pipelinedRequest = (HttpPipelinedRequest) msg; - FullHttpRequest request = pipelinedRequest.getRequest(); + final FullHttpRequest request = (FullHttpRequest) msg; - try { - final FullHttpRequest copiedRequest = - new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - Unpooled.copiedBuffer(request.content()), - request.headers(), - request.trailingHeaders()); + final FullHttpRequest copiedRequest = + new DefaultFullHttpRequest( + request.protocolVersion(), + request.method(), + request.uri(), + Unpooled.copiedBuffer(request.content()), + request.headers(), + request.trailingHeaders()); - Exception badRequestCause = null; + Exception badRequestCause = null; - /* - * We want to create a REST request from the incoming request from Netty. However, creating this request could fail if there - * are incorrectly encoded parameters, or the Content-Type header is invalid. If one of these specific failures occurs, we - * attempt to create a REST request again without the input that caused the exception (e.g., we remove the Content-Type header, - * or skip decoding the parameters). Once we have a request in hand, we then dispatch the request as a bad request with the - * underlying exception that caused us to treat the request as bad. - */ - final NioHttpRequest httpRequest; - { - NioHttpRequest innerHttpRequest; - try { - innerHttpRequest = new NioHttpRequest(xContentRegistry, copiedRequest); - } catch (final RestRequest.ContentTypeHeaderException e) { + /* + * We want to create a REST request from the incoming request from Netty. However, creating this request could fail if there + * are incorrectly encoded parameters, or the Content-Type header is invalid. If one of these specific failures occurs, we + * attempt to create a REST request again without the input that caused the exception (e.g., we remove the Content-Type header, + * or skip decoding the parameters). Once we have a request in hand, we then dispatch the request as a bad request with the + * underlying exception that caused us to treat the request as bad. + */ + final NioHttpRequest httpRequest; + { + NioHttpRequest innerHttpRequest; + try { + innerHttpRequest = new NioHttpRequest(xContentRegistry, copiedRequest); + } catch (final RestRequest.ContentTypeHeaderException e) { + badRequestCause = e; + innerHttpRequest = requestWithoutContentTypeHeader(copiedRequest, badRequestCause); + } catch (final RestRequest.BadParameterException e) { + badRequestCause = e; + innerHttpRequest = requestWithoutParameters(copiedRequest); + } + httpRequest = innerHttpRequest; + } + + /* + * We now want to create a channel used to send the response on. However, creating this channel can fail if there are invalid + * parameter values for any of the filter_path, human, or pretty parameters. We detect these specific failures via an + * IllegalArgumentException from the channel constructor and then attempt to create a new channel that bypasses parsing of these + * parameter values. + */ + final NioHttpChannel channel; + { + NioHttpChannel innerChannel; + try { + innerChannel = new NioHttpChannel(nioChannel, transport.getBigArrays(), httpRequest, settings, threadContext); + } catch (final IllegalArgumentException e) { + if (badRequestCause == null) { badRequestCause = e; - innerHttpRequest = requestWithoutContentTypeHeader(copiedRequest, badRequestCause); - } catch (final RestRequest.BadParameterException e) { - badRequestCause = e; - innerHttpRequest = requestWithoutParameters(copiedRequest); + } else { + badRequestCause.addSuppressed(e); } - httpRequest = innerHttpRequest; + final NioHttpRequest innerRequest = + new NioHttpRequest( + xContentRegistry, + Collections.emptyMap(), // we are going to dispatch the request as a bad request, drop all parameters + copiedRequest.uri(), + copiedRequest); + innerChannel = new NioHttpChannel(nioChannel, transport.getBigArrays(), innerRequest, settings, threadContext); } + channel = innerChannel; + } - /* - * We now want to create a channel used to send the response on. However, creating this channel can fail if there are invalid - * parameter values for any of the filter_path, human, or pretty parameters. We detect these specific failures via an - * IllegalArgumentException from the channel constructor and then attempt to create a new channel that bypasses parsing of - * these parameter values. - */ - final NioHttpChannel channel; - { - NioHttpChannel innerChannel; - int sequence = pipelinedRequest.getSequence(); - BigArrays bigArrays = transport.getBigArrays(); - try { - innerChannel = new NioHttpChannel(nioChannel, bigArrays, httpRequest, sequence, settings, threadContext); - } catch (final IllegalArgumentException e) { - if (badRequestCause == null) { - badRequestCause = e; - } else { - badRequestCause.addSuppressed(e); - } - final NioHttpRequest innerRequest = - new NioHttpRequest( - xContentRegistry, - Collections.emptyMap(), // we are going to dispatch the request as a bad request, drop all parameters - copiedRequest.uri(), - copiedRequest); - innerChannel = new NioHttpChannel(nioChannel, bigArrays, innerRequest, sequence, settings, threadContext); - } - channel = innerChannel; - } - - if (request.decoderResult().isFailure()) { - transport.dispatchBadRequest(httpRequest, channel, request.decoderResult().cause()); - } else if (badRequestCause != null) { - transport.dispatchBadRequest(httpRequest, channel, badRequestCause); - } else { - transport.dispatchRequest(httpRequest, channel); - } - } finally { - // As we have copied the buffer, we can release the request - request.release(); + if (request.decoderResult().isFailure()) { + transport.dispatchBadRequest(httpRequest, channel, request.decoderResult().cause()); + } else if (badRequestCause != null) { + transport.dispatchBadRequest(httpRequest, channel, badRequestCause); + } else { + transport.dispatchRequest(httpRequest, channel); } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java index 8ddce7a5b73..c838ae85e9d 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/HttpWriteOperation.java @@ -19,6 +19,7 @@ package org.elasticsearch.http.nio; +import io.netty.handler.codec.http.FullHttpResponse; import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.nio.WriteOperation; @@ -27,10 +28,10 @@ import java.util.function.BiConsumer; public class HttpWriteOperation implements WriteOperation { private final SocketChannelContext channelContext; - private final NioHttpResponse response; + private final FullHttpResponse response; private final BiConsumer listener; - HttpWriteOperation(SocketChannelContext channelContext, NioHttpResponse response, BiConsumer listener) { + HttpWriteOperation(SocketChannelContext channelContext, FullHttpResponse response, BiConsumer listener) { this.channelContext = channelContext; this.response = response; this.listener = listener; @@ -47,7 +48,7 @@ public class HttpWriteOperation implements WriteOperation { } @Override - public NioHttpResponse getObject() { + public FullHttpResponse getObject() { return response; } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java index cf8c92bff90..3344a312641 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyAdaptor.java @@ -53,7 +53,12 @@ public class NettyAdaptor implements AutoCloseable { try { ByteBuf message = (ByteBuf) msg; promise.addListener((f) -> message.release()); - NettyListener listener = NettyListener.fromChannelPromise(promise); + NettyListener listener; + if (promise instanceof NettyListener) { + listener = (NettyListener) promise; + } else { + listener = new NettyListener(promise); + } flushOperations.add(new FlushOperation(message.nioBuffers(), listener)); } catch (Exception e) { promise.setFailure(e); @@ -102,7 +107,18 @@ public class NettyAdaptor implements AutoCloseable { } public void write(WriteOperation writeOperation) { - nettyChannel.writeAndFlush(writeOperation.getObject(), NettyListener.fromBiConsumer(writeOperation.getListener(), nettyChannel)); + ChannelPromise channelPromise = nettyChannel.newPromise(); + channelPromise.addListener(f -> { + BiConsumer consumer = writeOperation.getListener(); + if (f.cause() == null) { + consumer.accept(null, null); + } else { + ExceptionsHelper.dieOnError(f.cause()); + consumer.accept(null, f.cause()); + } + }); + + nettyChannel.writeAndFlush(writeOperation.getObject(), new NettyListener(channelPromise)); } public FlushOperation pollOutboundOperation() { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyListener.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyListener.java index b907c0f2bc6..e806b0d23ce 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyListener.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NettyListener.java @@ -23,7 +23,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.util.concurrent.FutureUtils; import java.util.concurrent.ExecutionException; @@ -40,7 +40,7 @@ public class NettyListener implements BiConsumer, ChannelPromis private final ChannelPromise promise; - private NettyListener(ChannelPromise promise) { + NettyListener(ChannelPromise promise) { this.promise = promise; } @@ -211,30 +211,4 @@ public class NettyListener implements BiConsumer, ChannelPromis public ChannelPromise unvoid() { return promise.unvoid(); } - - public static NettyListener fromBiConsumer(BiConsumer biConsumer, Channel channel) { - if (biConsumer instanceof NettyListener) { - return (NettyListener) biConsumer; - } else { - ChannelPromise channelPromise = channel.newPromise(); - channelPromise.addListener(f -> { - if (f.cause() == null) { - biConsumer.accept(null, null); - } else { - ExceptionsHelper.dieOnError(f.cause()); - biConsumer.accept(null, f.cause()); - } - }); - - return new NettyListener(channelPromise); - } - } - - public static NettyListener fromChannelPromise(ChannelPromise channelPromise) { - if (channelPromise instanceof NettyListener) { - return (NettyListener) channelPromise; - } else { - return new NettyListener(channelPromise); - } - } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java index 97eba20a16f..672c6d5abad 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpChannel.java @@ -52,23 +52,20 @@ import java.util.EnumMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.BiConsumer; public class NioHttpChannel extends AbstractRestChannel { private final BigArrays bigArrays; - private final int sequence; private final ThreadContext threadContext; private final FullHttpRequest nettyRequest; private final NioSocketChannel nioChannel; private final boolean resetCookies; - NioHttpChannel(NioSocketChannel nioChannel, BigArrays bigArrays, NioHttpRequest request, int sequence, + NioHttpChannel(NioSocketChannel nioChannel, BigArrays bigArrays, NioHttpRequest request, HttpHandlingSettings settings, ThreadContext threadContext) { super(request, settings.getDetailedErrorsEnabled()); this.nioChannel = nioChannel; this.bigArrays = bigArrays; - this.sequence = sequence; this.threadContext = threadContext; this.nettyRequest = request.getRequest(); this.resetCookies = settings.isResetCookies(); @@ -120,8 +117,9 @@ public class NioHttpChannel extends AbstractRestChannel { toClose.add(nioChannel::close); } - BiConsumer listener = (aVoid, throwable) -> Releasables.close(toClose); - nioChannel.getContext().sendMessage(new NioHttpResponse(sequence, resp), listener); + nioChannel.getContext().sendMessage(resp, (aVoid, throwable) -> { + Releasables.close(toClose); + }); success = true; } finally { if (success == false) { diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java deleted file mode 100644 index 57a14e7819d..00000000000 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpPipeliningHandler.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.http.nio; - -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.LastHttpContent; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.http.HttpPipelinedRequest; -import org.elasticsearch.http.HttpPipeliningAggregator; -import org.elasticsearch.http.nio.NettyListener; -import org.elasticsearch.http.nio.NioHttpResponse; - -import java.nio.channels.ClosedChannelException; -import java.util.List; - -/** - * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests. - */ -public class NioHttpPipeliningHandler extends ChannelDuplexHandler { - - private final Logger logger; - private final HttpPipeliningAggregator aggregator; - - /** - * Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation. - * - * @param logger for logging unexpected errors - * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is - * required as events cannot queue up indefinitely - */ - public NioHttpPipeliningHandler(Logger logger, final int maxEventsHeld) { - this.logger = logger; - this.aggregator = new HttpPipeliningAggregator<>(maxEventsHeld); - } - - @Override - public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - if (msg instanceof LastHttpContent) { - HttpPipelinedRequest pipelinedRequest = aggregator.read(((LastHttpContent) msg).retain()); - ctx.fireChannelRead(pipelinedRequest); - } else { - ctx.fireChannelRead(msg); - } - } - - @Override - public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) { - assert msg instanceof NioHttpResponse : "Message must be type: " + NioHttpResponse.class; - NioHttpResponse response = (NioHttpResponse) msg; - boolean success = false; - try { - NettyListener listener = NettyListener.fromChannelPromise(promise); - List> readyResponses = aggregator.write(response, listener); - success = true; - for (Tuple responseToWrite : readyResponses) { - ctx.write(responseToWrite.v1().getResponse(), responseToWrite.v2()); - } - } catch (IllegalStateException e) { - ctx.channel().close(); - } finally { - if (success == false) { - promise.setFailure(new ClosedChannelException()); - } - } - } - - @Override - public void close(ChannelHandlerContext ctx, ChannelPromise promise) { - List> inflightResponses = aggregator.removeAllInflightResponses(); - - if (inflightResponses.isEmpty() == false) { - ClosedChannelException closedChannelException = new ClosedChannelException(); - for (Tuple inflightResponse : inflightResponses) { - try { - inflightResponse.v2().setFailure(closedChannelException); - } catch (RuntimeException e) { - logger.error("unexpected error while releasing pipelined http responses", e); - } - } - } - ctx.close(promise); - } -} diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpResponse.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpResponse.java deleted file mode 100644 index 4b634994b45..00000000000 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpResponse.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.http.nio; - -import io.netty.handler.codec.http.FullHttpResponse; -import org.elasticsearch.http.HttpPipelinedMessage; - -public class NioHttpResponse extends HttpPipelinedMessage { - - private final FullHttpResponse response; - - public NioHttpResponse(int sequence, FullHttpResponse response) { - super(sequence); - this.response = response; - } - - public FullHttpResponse getResponse() { - return response; - } -} diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java index 825a023bd51..06f581d7ab7 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpServerTransport.java @@ -20,7 +20,6 @@ package org.elasticsearch.http.nio; import io.netty.handler.timeout.ReadTimeoutException; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ElasticsearchException; @@ -85,7 +84,6 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_D import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS; public class NioHttpServerTransport extends AbstractHttpServerTransport { @@ -126,7 +124,6 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { ByteSizeValue maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); ByteSizeValue maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings); ByteSizeValue maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings); - int pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings); this.httpHandlingSettings = new HttpHandlingSettings(Math.toIntExact(maxContentLength.getBytes()), Math.toIntExact(maxChunkSize.getBytes()), Math.toIntExact(maxHeaderSize.getBytes()), @@ -134,8 +131,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { SETTING_HTTP_RESET_COOKIES.get(settings), SETTING_HTTP_COMPRESSION.get(settings), SETTING_HTTP_COMPRESSION_LEVEL.get(settings), - SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings), - pipeliningMaxEvents); + SETTING_HTTP_DETAILED_ERRORS_ENABLED.get(settings)); this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings); this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings); @@ -144,19 +140,14 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport { this.tcpReceiveBufferSize = Math.toIntExact(SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings).getBytes()); - logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}]," + - " pipelining_max_events[{}]", - maxChunkSize, maxHeaderSize, maxInitialLineLength, maxContentLength, pipeliningMaxEvents); + logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}]", + maxChunkSize, maxHeaderSize, maxInitialLineLength, maxContentLength); } BigArrays getBigArrays() { return bigArrays; } - public Logger getLogger() { - return logger; - } - @Override protected void doStart() { boolean success = false; diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java b/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java index 703f7acbf82..e0c8bacca1d 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/NioIntegTestCase.java @@ -20,7 +20,6 @@ package org.elasticsearch; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.http.nio.NioHttpServerTransport; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.nio.NioTransport; @@ -44,13 +43,11 @@ public abstract class NioIntegTestCase extends ESIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)); - // randomize nio settings + // randomize netty settings if (randomBoolean()) { builder.put(NioTransport.NIO_WORKER_COUNT.getKey(), random().nextInt(3) + 1); - builder.put(NioHttpServerTransport.NIO_HTTP_WORKER_COUNT.getKey(), random().nextInt(3) + 1); } builder.put(NetworkModule.TRANSPORT_TYPE_KEY, NioTransportPlugin.NIO_TRANSPORT_NAME); - builder.put(NetworkModule.HTTP_TYPE_KEY, NioTransportPlugin.NIO_HTTP_TRANSPORT_NAME); return builder.build(); } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java index cc8eeb77cc2..dce8319d2fc 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/HttpReadWriteHandlerTests.java @@ -61,11 +61,11 @@ import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUN import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_RESET_COOKIES; -import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; public class HttpReadWriteHandlerTests extends ESTestCase { @@ -91,8 +91,7 @@ public class HttpReadWriteHandlerTests extends ESTestCase { SETTING_HTTP_RESET_COOKIES.getDefault(settings), SETTING_HTTP_COMPRESSION.getDefault(settings), SETTING_HTTP_COMPRESSION_LEVEL.getDefault(settings), - SETTING_HTTP_DETAILED_ERRORS_ENABLED.getDefault(settings), - SETTING_PIPELINING_MAX_EVENTS.getDefault(settings)); + SETTING_HTTP_DETAILED_ERRORS_ENABLED.getDefault(settings)); ThreadContext threadContext = new ThreadContext(settings); nioSocketChannel = mock(NioSocketChannel.class); handler = new HttpReadWriteHandler(nioSocketChannel, transport, httpHandlingSettings, NamedXContentRegistry.EMPTY, threadContext); @@ -149,8 +148,7 @@ public class HttpReadWriteHandlerTests extends ESTestCase { handler.consumeReads(toChannelBuffer(buf)); - verify(transport, times(0)).dispatchBadRequest(any(), any(), any()); - verify(transport, times(0)).dispatchRequest(any(), any()); + verifyZeroInteractions(transport); List flushOperations = handler.pollFlushOperations(); assertFalse(flushOperations.isEmpty()); @@ -171,10 +169,9 @@ public class HttpReadWriteHandlerTests extends ESTestCase { prepareHandlerForResponse(handler); FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - NioHttpResponse pipelinedResponse = new NioHttpResponse(0, fullHttpResponse); SocketChannelContext context = mock(SocketChannelContext.class); - HttpWriteOperation writeOperation = new HttpWriteOperation(context, pipelinedResponse, mock(BiConsumer.class)); + HttpWriteOperation writeOperation = new HttpWriteOperation(context, fullHttpResponse, mock(BiConsumer.class)); List flushOperations = handler.writeToBytes(writeOperation); HttpResponse response = responseDecoder.decode(Unpooled.wrappedBuffer(flushOperations.get(0).getBuffersToWrite())); diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java deleted file mode 100644 index d12c608aeca..00000000000 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/http/nio/NioHttpPipeliningHandlerTests.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.http.nio; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.embedded.EmbeddedChannel; -import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.DefaultHttpRequest; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.codec.http.QueryStringDecoder; -import org.elasticsearch.common.Randomness; -import org.elasticsearch.http.HttpPipelinedRequest; -import org.elasticsearch.test.ESTestCase; -import org.junit.After; - -import java.nio.channels.ClosedChannelException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; -import static io.netty.handler.codec.http.HttpResponseStatus.OK; -import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.hamcrest.core.Is.is; - -public class NioHttpPipeliningHandlerTests extends ESTestCase { - - private final ExecutorService handlerService = Executors.newFixedThreadPool(randomIntBetween(4, 8)); - private final ExecutorService eventLoopService = Executors.newFixedThreadPool(1); - private final Map waitingRequests = new ConcurrentHashMap<>(); - private final Map finishingRequests = new ConcurrentHashMap<>(); - - @After - public void cleanup() throws Exception { - waitingRequests.keySet().forEach(this::finishRequest); - shutdownExecutorService(); - } - - private CountDownLatch finishRequest(String url) { - waitingRequests.get(url).countDown(); - return finishingRequests.get(url); - } - - private void shutdownExecutorService() throws InterruptedException { - if (!handlerService.isShutdown()) { - handlerService.shutdown(); - handlerService.awaitTermination(10, TimeUnit.SECONDS); - } - if (!eventLoopService.isShutdown()) { - eventLoopService.shutdown(); - eventLoopService.awaitTermination(10, TimeUnit.SECONDS); - } - } - - public void testThatPipeliningWorksWithFastSerializedRequests() throws InterruptedException { - final int numberOfRequests = randomIntBetween(2, 128); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests), - new WorkEmulatorHandler()); - - for (int i = 0; i < numberOfRequests; i++) { - embeddedChannel.writeInbound(createHttpRequest("/" + String.valueOf(i))); - } - - final List latches = new ArrayList<>(); - for (final String url : waitingRequests.keySet()) { - latches.add(finishRequest(url)); - } - - for (final CountDownLatch latch : latches) { - latch.await(); - } - - embeddedChannel.flush(); - - for (int i = 0; i < numberOfRequests; i++) { - assertReadHttpMessageHasContent(embeddedChannel, String.valueOf(i)); - } - - assertTrue(embeddedChannel.isOpen()); - } - - public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws InterruptedException { - final int numberOfRequests = randomIntBetween(2, 128); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests), - new WorkEmulatorHandler()); - - for (int i = 0; i < numberOfRequests; i++) { - embeddedChannel.writeInbound(createHttpRequest("/" + String.valueOf(i))); - } - - // random order execution - final List urls = new ArrayList<>(waitingRequests.keySet()); - Randomness.shuffle(urls); - final List latches = new ArrayList<>(); - for (final String url : urls) { - latches.add(finishRequest(url)); - } - - for (final CountDownLatch latch : latches) { - latch.await(); - } - - embeddedChannel.flush(); - - for (int i = 0; i < numberOfRequests; i++) { - assertReadHttpMessageHasContent(embeddedChannel, String.valueOf(i)); - } - - assertTrue(embeddedChannel.isOpen()); - } - - public void testThatPipeliningWorksWithChunkedRequests() throws InterruptedException { - final int numberOfRequests = randomIntBetween(2, 128); - final EmbeddedChannel embeddedChannel = - new EmbeddedChannel( - new AggregateUrisAndHeadersHandler(), - new NioHttpPipeliningHandler(logger, numberOfRequests), - new WorkEmulatorHandler()); - - for (int i = 0; i < numberOfRequests; i++) { - final DefaultHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/" + i); - embeddedChannel.writeInbound(request); - embeddedChannel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT); - } - - final List latches = new ArrayList<>(); - for (int i = numberOfRequests - 1; i >= 0; i--) { - latches.add(finishRequest(Integer.toString(i))); - } - - for (final CountDownLatch latch : latches) { - latch.await(); - } - - embeddedChannel.flush(); - - for (int i = 0; i < numberOfRequests; i++) { - assertReadHttpMessageHasContent(embeddedChannel, Integer.toString(i)); - } - - assertTrue(embeddedChannel.isOpen()); - } - - public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException { - final int numberOfRequests = randomIntBetween(2, 128); - final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests), - new WorkEmulatorHandler()); - - for (int i = 0; i < 1 + numberOfRequests + 1; i++) { - embeddedChannel.writeInbound(createHttpRequest("/" + Integer.toString(i))); - } - - final List latches = new ArrayList<>(); - final List requests = IntStream.range(1, numberOfRequests + 1).boxed().collect(Collectors.toList()); - Randomness.shuffle(requests); - - for (final Integer request : requests) { - latches.add(finishRequest(request.toString())); - } - - for (final CountDownLatch latch : latches) { - latch.await(); - } - - finishRequest(Integer.toString(numberOfRequests + 1)).await(); - - embeddedChannel.flush(); - - assertFalse(embeddedChannel.isOpen()); - } - - public void testPipeliningRequestsAreReleased() throws InterruptedException { - final int numberOfRequests = 10; - final EmbeddedChannel embeddedChannel = - new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests + 1)); - - for (int i = 0; i < numberOfRequests; i++) { - embeddedChannel.writeInbound(createHttpRequest("/" + i)); - } - - HttpPipelinedRequest inbound; - ArrayList> requests = new ArrayList<>(); - while ((inbound = embeddedChannel.readInbound()) != null) { - requests.add(inbound); - } - - ArrayList promises = new ArrayList<>(); - for (int i = 1; i < requests.size(); ++i) { - final FullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK); - ChannelPromise promise = embeddedChannel.newPromise(); - promises.add(promise); - int sequence = requests.get(i).getSequence(); - NioHttpResponse resp = new NioHttpResponse(sequence, httpResponse); - embeddedChannel.writeAndFlush(resp, promise); - } - - for (ChannelPromise promise : promises) { - assertFalse(promise.isDone()); - } - embeddedChannel.close().syncUninterruptibly(); - for (ChannelPromise promise : promises) { - assertTrue(promise.isDone()); - assertTrue(promise.cause() instanceof ClosedChannelException); - } - } - - private void assertReadHttpMessageHasContent(EmbeddedChannel embeddedChannel, String expectedContent) { - FullHttpResponse response = (FullHttpResponse) embeddedChannel.outboundMessages().poll(); - assertNotNull("Expected response to exist, maybe you did not wait long enough?", response); - assertNotNull("Expected response to have content " + expectedContent, response.content()); - String data = new String(ByteBufUtil.getBytes(response.content()), StandardCharsets.UTF_8); - assertThat(data, is(expectedContent)); - } - - private FullHttpRequest createHttpRequest(String uri) { - return new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri); - } - - private static class AggregateUrisAndHeadersHandler extends SimpleChannelInboundHandler { - - static final Queue QUEUE_URI = new LinkedTransferQueue<>(); - - @Override - protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception { - QUEUE_URI.add(request.uri()); - } - - } - - private class WorkEmulatorHandler extends SimpleChannelInboundHandler> { - - @Override - protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest pipelinedRequest) { - LastHttpContent request = pipelinedRequest.getRequest(); - final QueryStringDecoder decoder; - if (request instanceof FullHttpRequest) { - decoder = new QueryStringDecoder(((FullHttpRequest)request).uri()); - } else { - decoder = new QueryStringDecoder(AggregateUrisAndHeadersHandler.QUEUE_URI.poll()); - } - - final String uri = decoder.path().replace("/", ""); - final ByteBuf content = Unpooled.copiedBuffer(uri, StandardCharsets.UTF_8); - final DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK, content); - httpResponse.headers().add(CONTENT_LENGTH, content.readableBytes()); - - final CountDownLatch waitingLatch = new CountDownLatch(1); - waitingRequests.put(uri, waitingLatch); - final CountDownLatch finishingLatch = new CountDownLatch(1); - finishingRequests.put(uri, finishingLatch); - - handlerService.submit(() -> { - try { - waitingLatch.await(1000, TimeUnit.SECONDS); - final ChannelPromise promise = ctx.newPromise(); - eventLoopService.submit(() -> { - ctx.write(new NioHttpResponse(pipelinedRequest.getSequence(), httpResponse), promise); - finishingLatch.countDown(); - }); - } catch (InterruptedException e) { - fail(e.toString()); - } - }); - } - } -} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index d9cf0f630c0..c19cbe4687c 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -227,6 +227,7 @@ public final class ClusterSettings extends AbstractScopedSettings { HttpTransportSettings.SETTING_CORS_ENABLED, HttpTransportSettings.SETTING_CORS_MAX_AGE, HttpTransportSettings.SETTING_HTTP_DETAILED_ERRORS_ENABLED, + HttpTransportSettings.SETTING_PIPELINING, HttpTransportSettings.SETTING_CORS_ALLOW_ORIGIN, HttpTransportSettings.SETTING_HTTP_HOST, HttpTransportSettings.SETTING_HTTP_PUBLISH_HOST, diff --git a/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java b/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java index df038e8303e..f86049292f3 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java +++ b/server/src/main/java/org/elasticsearch/http/HttpHandlingSettings.java @@ -29,11 +29,9 @@ public class HttpHandlingSettings { private final boolean compression; private final int compressionLevel; private final boolean detailedErrorsEnabled; - private final int pipeliningMaxEvents; public HttpHandlingSettings(int maxContentLength, int maxChunkSize, int maxHeaderSize, int maxInitialLineLength, - boolean resetCookies, boolean compression, int compressionLevel, boolean detailedErrorsEnabled, - int pipeliningMaxEvents) { + boolean resetCookies, boolean compression, int compressionLevel, boolean detailedErrorsEnabled) { this.maxContentLength = maxContentLength; this.maxChunkSize = maxChunkSize; this.maxHeaderSize = maxHeaderSize; @@ -42,7 +40,6 @@ public class HttpHandlingSettings { this.compression = compression; this.compressionLevel = compressionLevel; this.detailedErrorsEnabled = detailedErrorsEnabled; - this.pipeliningMaxEvents = pipeliningMaxEvents; } public int getMaxContentLength() { @@ -76,8 +73,4 @@ public class HttpHandlingSettings { public boolean getDetailedErrorsEnabled() { return detailedErrorsEnabled; } - - public int getPipeliningMaxEvents() { - return pipeliningMaxEvents; - } } diff --git a/server/src/main/java/org/elasticsearch/http/HttpPipelinedMessage.java b/server/src/main/java/org/elasticsearch/http/HttpPipelinedMessage.java deleted file mode 100644 index 7db8666e73a..00000000000 --- a/server/src/main/java/org/elasticsearch/http/HttpPipelinedMessage.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.http; - -public class HttpPipelinedMessage implements Comparable { - - private final int sequence; - - public HttpPipelinedMessage(int sequence) { - this.sequence = sequence; - } - - public int getSequence() { - return sequence; - } - - @Override - public int compareTo(HttpPipelinedMessage o) { - return Integer.compare(sequence, o.sequence); - } -} diff --git a/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java b/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java deleted file mode 100644 index df8bd7ee1eb..00000000000 --- a/server/src/main/java/org/elasticsearch/http/HttpPipelinedRequest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.http; - -public class HttpPipelinedRequest extends HttpPipelinedMessage { - - private final R request; - - HttpPipelinedRequest(int sequence, R request) { - super(sequence); - this.request = request; - } - - public R getRequest() { - return request; - } -} diff --git a/server/src/main/java/org/elasticsearch/http/HttpPipeliningAggregator.java b/server/src/main/java/org/elasticsearch/http/HttpPipeliningAggregator.java deleted file mode 100644 index f38e9677979..00000000000 --- a/server/src/main/java/org/elasticsearch/http/HttpPipeliningAggregator.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.http; - -import org.elasticsearch.common.collect.Tuple; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; - -public class HttpPipeliningAggregator { - - private final int maxEventsHeld; - private final PriorityQueue> outboundHoldingQueue; - /* - * The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the - * channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the - * current write sequence, implying that all preceding messages have been written. - */ - private int readSequence; - private int writeSequence; - - public HttpPipeliningAggregator(int maxEventsHeld) { - this.maxEventsHeld = maxEventsHeld; - this.outboundHoldingQueue = new PriorityQueue<>(1, Comparator.comparing(Tuple::v1)); - } - - public HttpPipelinedRequest read(final Request request) { - return new HttpPipelinedRequest<>(readSequence++, request); - } - - public List> write(final Response response, Listener listener) { - if (outboundHoldingQueue.size() < maxEventsHeld) { - ArrayList> readyResponses = new ArrayList<>(); - outboundHoldingQueue.add(new Tuple<>(response, listener)); - while (!outboundHoldingQueue.isEmpty()) { - /* - * Since the response with the lowest sequence number is the top of the priority queue, we know if its sequence - * number does not match the current write sequence number then we have not processed all preceding responses yet. - */ - final Tuple top = outboundHoldingQueue.peek(); - - if (top.v1().getSequence() != writeSequence) { - break; - } - outboundHoldingQueue.poll(); - readyResponses.add(top); - writeSequence++; - } - - return readyResponses; - } else { - int eventCount = outboundHoldingQueue.size() + 1; - throw new IllegalStateException("Too many pipelined events [" + eventCount + "]. Max events allowed [" - + maxEventsHeld + "]."); - } - } - - public List> removeAllInflightResponses() { - ArrayList> responses = new ArrayList<>(outboundHoldingQueue); - outboundHoldingQueue.clear(); - return responses; - } -} diff --git a/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java b/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java index 4670137d09a..98451e0c304 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java +++ b/server/src/main/java/org/elasticsearch/http/HttpTransportSettings.java @@ -49,6 +49,8 @@ public final class HttpTransportSettings { new Setting<>("http.cors.allow-headers", "X-Requested-With,Content-Type,Content-Length", (value) -> value, Property.NodeScope); public static final Setting SETTING_CORS_ALLOW_CREDENTIALS = Setting.boolSetting("http.cors.allow-credentials", false, Property.NodeScope); + public static final Setting SETTING_PIPELINING = + Setting.boolSetting("http.pipelining", true, Property.NodeScope); public static final Setting SETTING_PIPELINING_MAX_EVENTS = Setting.intSetting("http.pipelining.max_events", 10000, Property.NodeScope); public static final Setting SETTING_HTTP_COMPRESSION = diff --git a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java index fdc36152cc8..fc284b9f5e8 100644 --- a/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/single/SingleNodeDiscoveryIT.java @@ -150,6 +150,7 @@ public class SingleNodeDiscoveryIT extends ESIntegTestCase { internalCluster().getClusterName(), configurationSource, 0, + false, "other", Arrays.asList(getTestTransportPlugin(), MockHttpTransport.TestPlugin.class), Function.identity())) { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 505a5937d29..7210fadd7ea 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1829,7 +1829,7 @@ public abstract class ESIntegTestCase extends ESTestCase { return new InternalTestCluster(seed, createTempDir(), supportsDedicatedMasters, getAutoMinMasterNodes(), minNumDataNodes, maxNumDataNodes, InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(), - nodePrefix, mockPlugins, getClientWrapper()); + InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper()); } protected NodeConfigurationSource getNodeConfigSource() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index efe775f7415..de4b5a5ae05 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -171,6 +171,8 @@ public final class InternalTestCluster extends TestCluster { static final int DEFAULT_MIN_NUM_CLIENT_NODES = 0; static final int DEFAULT_MAX_NUM_CLIENT_NODES = 1; + static final boolean DEFAULT_ENABLE_HTTP_PIPELINING = true; + /* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */ private final NavigableMap nodes = new TreeMap<>(); @@ -217,7 +219,7 @@ public final class InternalTestCluster extends TestCluster { public InternalTestCluster(long clusterSeed, Path baseDir, boolean randomlyAddDedicatedMasters, boolean autoManageMinMasterNodes, int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes, - String nodePrefix, Collection> mockPlugins, Function clientWrapper) { + boolean enableHttpPipelining, String nodePrefix, Collection> mockPlugins, Function clientWrapper) { super(clusterSeed); this.autoManageMinMasterNodes = autoManageMinMasterNodes; this.clientWrapper = clientWrapper; @@ -298,6 +300,7 @@ public final class InternalTestCluster extends TestCluster { builder.put(Environment.PATH_REPO_SETTING.getKey(), baseDir.resolve("repos")); builder.put(TcpTransport.PORT.getKey(), 0); builder.put("http.port", 0); + builder.put("http.pipelining", enableHttpPipelining); if (Strings.hasLength(System.getProperty("tests.es.logger.level"))) { builder.put("logger.level", System.getProperty("tests.es.logger.level")); } diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java index 23f44c560ba..ca674e61651 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java @@ -19,6 +19,8 @@ */ package org.elasticsearch.test.test; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; @@ -26,7 +28,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.env.NodeEnvironment; @@ -62,6 +63,8 @@ import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_M import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileNotExists; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.not; /** @@ -83,15 +86,16 @@ public class InternalTestClusterTests extends ESTestCase { String clusterName = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); NodeConfigurationSource nodeConfigurationSource = NodeConfigurationSource.EMPTY; int numClientNodes = randomIntBetween(0, 10); + boolean enableHttpPipelining = randomBoolean(); String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); Path baseDir = createTempDir(); InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, randomBoolean(), minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, - nodePrefix, Collections.emptyList(), Function.identity()); + enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, randomBoolean(), minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, - nodePrefix, Collections.emptyList(), Function.identity()); + enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); // TODO: this is not ideal - we should have a way to make sure ports are initialized in the same way assertClusters(cluster0, cluster1, false); @@ -207,15 +211,16 @@ public class InternalTestClusterTests extends ESTestCase { } }; + boolean enableHttpPipelining = randomBoolean(); String nodePrefix = "foobar"; Path baseDir = createTempDir(); InternalTestCluster cluster0 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, - nodePrefix, mockPlugins(), Function.identity()); + enableHttpPipelining, nodePrefix, mockPlugins(), Function.identity()); InternalTestCluster cluster1 = new InternalTestCluster(clusterSeed, baseDir, masterNodes, autoManageMinMasterNodes, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, - nodePrefix, mockPlugins(), Function.identity()); + enableHttpPipelining, nodePrefix, mockPlugins(), Function.identity()); assertClusters(cluster0, cluster1, false); long seed = randomLong(); @@ -275,11 +280,12 @@ public class InternalTestClusterTests extends ESTestCase { .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build(); } }; + boolean enableHttpPipelining = randomBoolean(); String nodePrefix = "test"; Path baseDir = createTempDir(); InternalTestCluster cluster = new InternalTestCluster(clusterSeed, baseDir, masterNodes, true, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, - nodePrefix, mockPlugins(), Function.identity()); + enableHttpPipelining, nodePrefix, mockPlugins(), Function.identity()); try { cluster.beforeTest(random(), 0.0); final int originalMasterCount = cluster.numMasterNodes(); @@ -384,7 +390,7 @@ public class InternalTestClusterTests extends ESTestCase { return Settings.builder() .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build(); } - }, 0, "", mockPlugins(), Function.identity()); + }, 0, randomBoolean(), "", mockPlugins(), Function.identity()); cluster.beforeTest(random(), 0.0); List roles = new ArrayList<>(); for (int i = 0; i < numNodes; i++) { @@ -467,12 +473,13 @@ public class InternalTestClusterTests extends ESTestCase { .put(NetworkModule.TRANSPORT_TYPE_KEY, transportClient).build(); } }; + boolean enableHttpPipelining = randomBoolean(); String nodePrefix = "test"; Path baseDir = createTempDir(); List> plugins = new ArrayList<>(mockPlugins()); plugins.add(NodeAttrCheckPlugin.class); InternalTestCluster cluster = new InternalTestCluster(randomLong(), baseDir, false, true, 2, 2, - "test", nodeConfigurationSource, 0, nodePrefix, + "test", nodeConfigurationSource, 0, enableHttpPipelining, nodePrefix, plugins, Function.identity()); try { cluster.beforeTest(random(), 0.0); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java index dab3d023f65..ec448f14e91 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java @@ -214,7 +214,7 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase { mockPlugins.add(getTestTransportPlugin()); } remoteCluster = new InternalTestCluster(randomLong(), createTempDir(), false, true, numNodes, numNodes, cluster2Name, - cluster2SettingsSource, 0, SECOND_CLUSTER_NODE_PREFIX, mockPlugins, + cluster2SettingsSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, mockPlugins, useSecurity ? getClientWrapper() : Function.identity()); remoteCluster.beforeTest(random(), 0.5); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/RemoteIndexAuditTrailStartingTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/RemoteIndexAuditTrailStartingTests.java index 96bba962237..7002803a3d4 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/RemoteIndexAuditTrailStartingTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/RemoteIndexAuditTrailStartingTests.java @@ -117,7 +117,7 @@ public class RemoteIndexAuditTrailStartingTests extends SecurityIntegTestCase { } }; remoteCluster = new InternalTestCluster(randomLong(), createTempDir(), false, true, numNodes, numNodes, - cluster2Name, cluster2SettingsSource, 0, SECOND_CLUSTER_NODE_PREFIX, getMockPlugins(), getClientWrapper()); + cluster2Name, cluster2SettingsSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, getMockPlugins(), getClientWrapper()); remoteCluster.beforeTest(random(), 0.0); assertNoTimeout(remoteCluster.client().admin().cluster().prepareHealth().setWaitForGreenStatus().get()); }