Stop Copying Bulk HTTP Requests in NIO Networking (#49819) (#51393)

Same as #44564 but for NIO.
This commit is contained in:
Armin Braun 2020-01-24 11:23:16 +01:00 committed by GitHub
parent 8703b885c2
commit c29b235a5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 36 deletions

View File

@ -24,6 +24,7 @@ import io.netty.buffer.Unpooled;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.bytes.AbstractBytesReference;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -70,7 +71,8 @@ class ByteBufUtils {
}
static BytesReference toBytesReference(final ByteBuf buffer) {
return new ByteBufBytesReference(buffer, buffer.readableBytes());
final int readableBytes = buffer.readableBytes();
return readableBytes == 0 ? BytesArray.EMPTY : new ByteBufBytesReference(buffer, readableBytes);
}
private static class ByteBufBytesReference extends AbstractBytesReference {

View File

@ -19,10 +19,8 @@
package org.elasticsearch.http.nio;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
@ -158,32 +156,25 @@ public class HttpReadWriteHandler implements NioChannelHandler {
private void handleRequest(Object msg) {
final HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = (HttpPipelinedRequest<FullHttpRequest>) msg;
FullHttpRequest request = pipelinedRequest.getRequest();
final FullHttpRequest copiedRequest;
boolean success = false;
NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence());
try {
copiedRequest = new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.copiedBuffer(request.content()),
request.headers(),
request.trailingHeaders());
} finally {
// As we have copied the buffer, we can release the request
request.release();
}
NioHttpRequest httpRequest = new NioHttpRequest(copiedRequest, pipelinedRequest.getSequence());
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
} else {
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
}
} else {
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
transport.incomingRequest(httpRequest, nioHttpChannel);
}
success = true;
} finally {
if (success == false) {
request.release();
}
} else {
transport.incomingRequest(httpRequest, nioHttpChannel);
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.http.nio;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
@ -28,7 +30,6 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieDecoder;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.rest.RestRequest;
@ -40,6 +41,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class NioHttpRequest implements HttpRequest {
@ -48,16 +50,22 @@ public class NioHttpRequest implements HttpRequest {
private final BytesReference content;
private final HttpHeadersMap headers;
private final int sequence;
private final AtomicBoolean released;
private final boolean pooled;
NioHttpRequest(FullHttpRequest request, int sequence) {
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
ByteBufUtils.toBytesReference(request.content()));
}
private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
BytesReference content) {
this.request = request;
headers = new HttpHeadersMap(request.headers());
this.sequence = sequence;
if (request.content().isReadable()) {
this.content = ByteBufUtils.toBytesReference(request.content());
} else {
this.content = BytesArray.EMPTY;
}
this.headers = headers;
this.content = content;
this.pooled = pooled;
this.released = released;
}
@Override
@ -105,17 +113,32 @@ public class NioHttpRequest implements HttpRequest {
@Override
public BytesReference content() {
assert released.get() == false;
return content;
}
@Override
public void release() {
// NioHttpRequest works from copied unpooled bytes no need to release anything
if (pooled && released.compareAndSet(false, true)) {
request.release();
}
}
@Override
public HttpRequest releaseAndCopy() {
return this;
assert released.get() == false;
if (pooled == false) {
return this;
}
try {
final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
return new NioHttpRequest(
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
request.trailingHeaders()),
headers, sequence, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent));
} finally {
release();
}
}
@Override
@ -156,7 +179,8 @@ public class NioHttpRequest implements HttpRequest {
trailingHeaders.remove(header);
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
return new NioHttpRequest(requestWithoutHeader, sequence);
return new NioHttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
pooled, content);
}
@Override