Stop Copying Every Http Request in Message Handler (#44564) (#49809)

* Copying the request is not necessary here. We can simply release it once the response has been generated and a lot of `Unpooled` allocations that way
* Relates #32228
   * I think the issue that preventet that PR  that PR from being merged was solved by #39634 that moved the bulk index marker search to ByteBuf bulk access so the composite buffer shouldn't require many additional bounds checks  (I'd argue the bounds checks we add, we save when copying the composite buffer)
* I couldn't neccessarily reproduce much of a speedup from this change, but I could reproduce a very measureable reduction in GC time with e.g. Rally's PMC (4g heap node and bulk requests of size 5k saw a reduction in young GC time by ~10% for me)
This commit is contained in:
Armin Braun 2019-12-04 08:41:42 +01:00 committed by GitHub
parent c33be29dc7
commit 996cddd98b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 151 additions and 37 deletions

View File

@ -19,6 +19,8 @@
package org.elasticsearch.http.netty4;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
@ -28,7 +30,6 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.rest.RestRequest;
@ -41,23 +42,30 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class Netty4HttpRequest implements HttpRequest {
private final FullHttpRequest request;
private final BytesReference content;
private final HttpHeadersMap headers;
private final int sequence;
private final AtomicBoolean released;
private final FullHttpRequest request;
private final boolean pooled;
private final BytesReference content;
Netty4HttpRequest(FullHttpRequest request, int sequence) {
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
Netty4Utils.toBytesReference(request.content()));
}
private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
BytesReference content) {
this.request = request;
headers = new HttpHeadersMap(request.headers());
this.sequence = sequence;
if (request.content().isReadable()) {
this.content = Netty4Utils.toBytesReference(request.content());
} else {
this.content = BytesArray.EMPTY;
}
this.headers = headers;
this.content = content;
this.pooled = pooled;
this.released = released;
}
@Override
@ -105,9 +113,33 @@ public class Netty4HttpRequest implements HttpRequest {
@Override
public BytesReference content() {
assert released.get() == false;
return content;
}
@Override
public void release() {
if (pooled && released.compareAndSet(false, true)) {
request.release();
}
}
@Override
public HttpRequest releaseAndCopy() {
assert released.get() == false;
if (pooled == false) {
return this;
}
try {
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
return new Netty4HttpRequest(
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
request.trailingHeaders()),
headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
} finally {
release();
}
}
@Override
public final Map<String, List<String>> getHeaders() {
@ -147,7 +179,8 @@ public class Netty4HttpRequest implements HttpRequest {
trailingHeaders.remove(header);
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
return new Netty4HttpRequest(requestWithoutHeader, sequence);
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
pooled, content);
}
@Override

View File

@ -19,11 +19,9 @@
package org.elasticsearch.http.netty4;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;
@ -41,32 +39,25 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
final FullHttpRequest copiedRequest;
boolean success = false;
Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());
try {
copiedRequest =
new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.copiedBuffer(request.content()),
request.headers(),
request.trailingHeaders());
} finally {
// As we have copied the buffer, we can release the request
request.release();
}
Netty4HttpRequest httpRequest = new Netty4HttpRequest(copiedRequest, msg.getSequence());
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
}
} else {
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
serverTransport.incomingRequest(httpRequest, channel);
}
success = true;
} finally {
if (success == false) {
httpRequest.release();
}
} else {
serverTransport.incomingRequest(httpRequest, channel);
}
}

View File

@ -108,6 +108,15 @@ public class NioHttpRequest implements HttpRequest {
return content;
}
@Override
public void release() {
// NioHttpRequest works from copied unpooled bytes no need to release anything
}
@Override
public HttpRequest releaseAndCopy() {
return this;
}
@Override
public final Map<String, List<String>> getHeaders() {

View File

@ -77,7 +77,8 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann
@Override
public void sendResponse(RestResponse restResponse) {
final ArrayList<Releasable> toClose = new ArrayList<>(3);
final ArrayList<Releasable> toClose = new ArrayList<>(4);
toClose.add(httpRequest::release);
if (isCloseConnection()) {
toClose.add(() -> CloseableChannel.closeChannel(httpChannel));
}

View File

@ -68,4 +68,16 @@ public interface HttpRequest {
*/
HttpResponse createResponse(RestStatus status, BytesReference content);
/**
* Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()}
* after this method has been invoked is undefined and implementation specific.
*/
void release();
/**
* If this instances uses any pooled resources, creates a copy of this instance that does not use any pooled resources and releases
* any resources associated with this instance. If the instance does not use any shared resources, returns itself.
* @return a safe unpooled http request
*/
HttpRequest releaseAndCopy();
}

View File

@ -220,6 +220,10 @@ public class RestController implements HttpServerTransport.Dispatcher {
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
// TODO: Count requests double in the circuit breaker if they need copying?
if (handler.allowsUnsafeBuffers() == false) {
request.ensureSafeBuffers();
}
handler.handleRequest(request, responseChannel, client);
} catch (Exception e) {
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));

View File

@ -47,4 +47,16 @@ public interface RestHandler {
default boolean supportsContentStream() {
return false;
}
/**
* Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return
* {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the
* {@link RestRequest} instance before passing a request to this handler. If this instance does not support pooled/unsafe buffers
* {@link RestRequest#ensureSafeBuffers()} should be called on any request before passing it to {@link #handleRequest}.
*
* @return true iff the handler supports requests that make use of pooled buffers
*/
default boolean allowsUnsafeBuffers() {
return false;
}
}

View File

@ -64,9 +64,10 @@ public class RestRequest implements ToXContent.Params {
private final String rawPath;
private final Set<String> consumedParams = new HashSet<>();
private final SetOnce<XContentType> xContentType = new SetOnce<>();
private final HttpRequest httpRequest;
private final HttpChannel httpChannel;
private HttpRequest httpRequest;
private boolean contentConsumed = false;
public boolean isContentConsumed() {
@ -97,6 +98,15 @@ public class RestRequest implements ToXContent.Params {
restRequest.getHttpRequest(), restRequest.getHttpChannel());
}
/**
* Invoke {@link HttpRequest#releaseAndCopy()} on the http request in this instance and replace a pooled http request
* with an unpooled copy. This is supposed to be used before passing requests to {@link RestHandler} instances that can not safely
* handle http requests that use pooled buffers as determined by {@link RestHandler#allowsUnsafeBuffers()}.
*/
void ensureSafeBuffers() {
httpRequest = httpRequest.releaseAndCopy();
}
/**
* Creates a new REST request. This method will throw {@link BadParameterException} if the path cannot be
* decoded

View File

@ -103,4 +103,9 @@ public class RestBulkAction extends BaseRestHandler {
public boolean supportsContentStream() {
return true;
}
@Override
public boolean allowsUnsafeBuffers() {
return true;
}
}

View File

@ -324,4 +324,9 @@ public class RestSearchAction extends BaseRestHandler {
protected Set<String> responseParams() {
return RESPONSE_PARAMS;
}
@Override
public boolean allowsUnsafeBuffers() {
return true;
}
}

View File

@ -460,6 +460,15 @@ public class DefaultRestChannelTests extends ESTestCase {
public HttpResponse createResponse(RestStatus status, BytesReference content) {
return new TestResponse(status, content);
}
@Override
public void release() {
}
@Override
public HttpRequest releaseAndCopy() {
return this;
}
}
private static class TestResponse implements HttpResponse {

View File

@ -575,6 +575,15 @@ public class RestControllerTests extends ESTestCase {
public HttpResponse createResponse(RestStatus status, BytesReference content) {
return null;
}
@Override
public void release() {
}
@Override
public HttpRequest releaseAndCopy() {
return this;
}
}, null);
final AssertingChannel channel = new AssertingChannel(request, true, RestStatus.METHOD_NOT_ALLOWED);

View File

@ -113,6 +113,15 @@ public class FakeRestRequest extends RestRequest {
}
};
}
@Override
public void release() {
}
@Override
public HttpRequest releaseAndCopy() {
return this;
}
}
private static class FakeHttpChannel implements HttpChannel {

View File

@ -86,6 +86,11 @@ public class SecurityRestFilter implements RestHandler {
return restHandler.supportsContentStream();
}
@Override
public boolean allowsUnsafeBuffers() {
return restHandler.allowsUnsafeBuffers();
}
private RestRequest maybeWrapRestRequest(RestRequest restRequest) throws IOException {
if (restHandler instanceof RestRequestFilter) {
return ((RestRequestFilter)restHandler).getFilteredRequest(restRequest);