diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java index ffabe5cbbe2..e0ad3007c98 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.http.netty4; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.FullHttpRequest; @@ -28,7 +30,6 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.ServerCookieDecoder; import io.netty.handler.codec.http.cookie.ServerCookieEncoder; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.http.HttpRequest; import org.elasticsearch.rest.RestRequest; @@ -41,23 +42,30 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; public class Netty4HttpRequest implements HttpRequest { - private final FullHttpRequest request; - private final BytesReference content; private final HttpHeadersMap headers; private final int sequence; + private final AtomicBoolean released; + private final FullHttpRequest request; + private final boolean pooled; + private final BytesReference content; Netty4HttpRequest(FullHttpRequest request, int sequence) { + this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true, + Netty4Utils.toBytesReference(request.content())); + } + + private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled, + BytesReference content) { this.request = request; - headers = new HttpHeadersMap(request.headers()); this.sequence = sequence; - if (request.content().isReadable()) { - this.content = Netty4Utils.toBytesReference(request.content()); - } else { - this.content = BytesArray.EMPTY; - } + this.headers = headers; + this.content = content; + this.pooled = pooled; + this.released = released; } @Override @@ -105,9 +113,33 @@ public class Netty4HttpRequest implements HttpRequest { @Override public BytesReference content() { + assert released.get() == false; return content; } + @Override + public void release() { + if (pooled && released.compareAndSet(false, true)) { + request.release(); + } + } + + @Override + public HttpRequest releaseAndCopy() { + assert released.get() == false; + if (pooled == false) { + return this; + } + try { + final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content()); + return new Netty4HttpRequest( + new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(), + request.trailingHeaders()), + headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent)); + } finally { + release(); + } + } @Override public final Map> getHeaders() { @@ -147,7 +179,8 @@ public class Netty4HttpRequest implements HttpRequest { trailingHeaders.remove(header); FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), request.content(), headersWithoutContentTypeHeader, trailingHeaders); - return new Netty4HttpRequest(requestWithoutHeader, sequence); + return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released, + pooled, content); } @Override diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index ad6d84dfcb4..7e7f45ef92e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -19,11 +19,9 @@ package org.elasticsearch.http.netty4; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.http.HttpPipelinedRequest; @@ -41,32 +39,25 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler msg) { Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get(); FullHttpRequest request = msg.getRequest(); - final FullHttpRequest copiedRequest; + boolean success = false; + Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence()); try { - copiedRequest = - new DefaultFullHttpRequest( - request.protocolVersion(), - request.method(), - request.uri(), - Unpooled.copiedBuffer(request.content()), - request.headers(), - request.trailingHeaders()); - } finally { - // As we have copied the buffer, we can release the request - request.release(); - } - Netty4HttpRequest httpRequest = new Netty4HttpRequest(copiedRequest, msg.getSequence()); - - if (request.decoderResult().isFailure()) { - Throwable cause = request.decoderResult().cause(); - if (cause instanceof Error) { - ExceptionsHelper.maybeDieOnAnotherThread(cause); - serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause)); + if (request.decoderResult().isFailure()) { + Throwable cause = request.decoderResult().cause(); + if (cause instanceof Error) { + ExceptionsHelper.maybeDieOnAnotherThread(cause); + serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause)); + } else { + serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause); + } } else { - serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause); + serverTransport.incomingRequest(httpRequest, channel); + } + success = true; + } finally { + if (success == false) { + httpRequest.release(); } - } else { - serverTransport.incomingRequest(httpRequest, channel); } } diff --git a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java index 08937593f3b..8e17d37699c 100644 --- a/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java +++ b/plugins/transport-nio/src/main/java/org/elasticsearch/http/nio/NioHttpRequest.java @@ -108,6 +108,15 @@ public class NioHttpRequest implements HttpRequest { return content; } + @Override + public void release() { + // NioHttpRequest works from copied unpooled bytes no need to release anything + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } @Override public final Map> getHeaders() { diff --git a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java index 098a0141089..98f578bb341 100644 --- a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java @@ -77,7 +77,8 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann @Override public void sendResponse(RestResponse restResponse) { - final ArrayList toClose = new ArrayList<>(3); + final ArrayList toClose = new ArrayList<>(4); + toClose.add(httpRequest::release); if (isCloseConnection()) { toClose.add(() -> CloseableChannel.closeChannel(httpChannel)); } diff --git a/server/src/main/java/org/elasticsearch/http/HttpRequest.java b/server/src/main/java/org/elasticsearch/http/HttpRequest.java index 02a3a58d170..4d67078fe57 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpRequest.java +++ b/server/src/main/java/org/elasticsearch/http/HttpRequest.java @@ -68,4 +68,16 @@ public interface HttpRequest { */ HttpResponse createResponse(RestStatus status, BytesReference content); + /** + * Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()} + * after this method has been invoked is undefined and implementation specific. + */ + void release(); + + /** + * If this instances uses any pooled resources, creates a copy of this instance that does not use any pooled resources and releases + * any resources associated with this instance. If the instance does not use any shared resources, returns itself. + * @return a safe unpooled http request + */ + HttpRequest releaseAndCopy(); } diff --git a/server/src/main/java/org/elasticsearch/rest/RestController.java b/server/src/main/java/org/elasticsearch/rest/RestController.java index 1f29e9ed0a8..3eb4c0df05e 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestController.java +++ b/server/src/main/java/org/elasticsearch/rest/RestController.java @@ -220,6 +220,10 @@ public class RestController implements HttpServerTransport.Dispatcher { } // iff we could reserve bytes for the request we need to send the response also over this channel responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); + // TODO: Count requests double in the circuit breaker if they need copying? + if (handler.allowsUnsafeBuffers() == false) { + request.ensureSafeBuffers(); + } handler.handleRequest(request, responseChannel, client); } catch (Exception e) { responseChannel.sendResponse(new BytesRestResponse(responseChannel, e)); diff --git a/server/src/main/java/org/elasticsearch/rest/RestHandler.java b/server/src/main/java/org/elasticsearch/rest/RestHandler.java index 1ebc7a7fd1b..605dd41078a 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestHandler.java +++ b/server/src/main/java/org/elasticsearch/rest/RestHandler.java @@ -47,4 +47,16 @@ public interface RestHandler { default boolean supportsContentStream() { return false; } + + /** + * Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return + * {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the + * {@link RestRequest} instance before passing a request to this handler. If this instance does not support pooled/unsafe buffers + * {@link RestRequest#ensureSafeBuffers()} should be called on any request before passing it to {@link #handleRequest}. + * + * @return true iff the handler supports requests that make use of pooled buffers + */ + default boolean allowsUnsafeBuffers() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/rest/RestRequest.java b/server/src/main/java/org/elasticsearch/rest/RestRequest.java index 405cf7f68ef..61521735f00 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/server/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -64,9 +64,10 @@ public class RestRequest implements ToXContent.Params { private final String rawPath; private final Set consumedParams = new HashSet<>(); private final SetOnce xContentType = new SetOnce<>(); - private final HttpRequest httpRequest; private final HttpChannel httpChannel; + private HttpRequest httpRequest; + private boolean contentConsumed = false; public boolean isContentConsumed() { @@ -97,6 +98,15 @@ public class RestRequest implements ToXContent.Params { restRequest.getHttpRequest(), restRequest.getHttpChannel()); } + /** + * Invoke {@link HttpRequest#releaseAndCopy()} on the http request in this instance and replace a pooled http request + * with an unpooled copy. This is supposed to be used before passing requests to {@link RestHandler} instances that can not safely + * handle http requests that use pooled buffers as determined by {@link RestHandler#allowsUnsafeBuffers()}. + */ + void ensureSafeBuffers() { + httpRequest = httpRequest.releaseAndCopy(); + } + /** * Creates a new REST request. This method will throw {@link BadParameterException} if the path cannot be * decoded diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index 33fd1d46b3e..4eb2a49884f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -103,4 +103,9 @@ public class RestBulkAction extends BaseRestHandler { public boolean supportsContentStream() { return true; } + + @Override + public boolean allowsUnsafeBuffers() { + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 20dbbd4b55c..c238f47f765 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -324,4 +324,9 @@ public class RestSearchAction extends BaseRestHandler { protected Set responseParams() { return RESPONSE_PARAMS; } + + @Override + public boolean allowsUnsafeBuffers() { + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java index 85670e893b9..d671b81a09b 100644 --- a/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java +++ b/server/src/test/java/org/elasticsearch/http/DefaultRestChannelTests.java @@ -460,6 +460,15 @@ public class DefaultRestChannelTests extends ESTestCase { public HttpResponse createResponse(RestStatus status, BytesReference content) { return new TestResponse(status, content); } + + @Override + public void release() { + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } } private static class TestResponse implements HttpResponse { diff --git a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java index d9e7ede5df9..8994817780f 100644 --- a/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/server/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -575,6 +575,15 @@ public class RestControllerTests extends ESTestCase { public HttpResponse createResponse(RestStatus status, BytesReference content) { return null; } + + @Override + public void release() { + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } }, null); final AssertingChannel channel = new AssertingChannel(request, true, RestStatus.METHOD_NOT_ALLOWED); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index a659d6af5c6..2f2f5fb76bf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -113,6 +113,15 @@ public class FakeRestRequest extends RestRequest { } }; } + + @Override + public void release() { + } + + @Override + public HttpRequest releaseAndCopy() { + return this; + } } private static class FakeHttpChannel implements HttpChannel { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java index df678f9c63b..4131d1e7358 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/SecurityRestFilter.java @@ -86,6 +86,11 @@ public class SecurityRestFilter implements RestHandler { return restHandler.supportsContentStream(); } + @Override + public boolean allowsUnsafeBuffers() { + return restHandler.allowsUnsafeBuffers(); + } + private RestRequest maybeWrapRestRequest(RestRequest restRequest) throws IOException { if (restHandler instanceof RestRequestFilter) { return ((RestRequestFilter)restHandler).getFilteredRequest(restRequest);