mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-16 18:04:52 +00:00
Create HttpRequest earlier in pipeline (#56393)
Elasticsearch requires that a HttpRequest abstraction be implemented by http modules before server processing. This abstraction controls when underlying resources are released. This commit moves this abstraction to be created immediately after content aggregation. This change will enable follow-up work including moving Cors logic into the server package and tracking bytes as they are aggregated from the network level.
This commit is contained in:
parent
22f54ba205
commit
57c3a61535
@ -22,10 +22,10 @@ package org.elasticsearch.http.netty4;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.http.HttpPipelinedRequest;
|
||||
import org.elasticsearch.http.HttpPipelinedResponse;
|
||||
import org.elasticsearch.http.HttpPipeliningAggregator;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
@ -37,7 +37,7 @@ import java.util.List;
|
||||
public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
|
||||
|
||||
private final Logger logger;
|
||||
private final HttpPipeliningAggregator<Netty4HttpResponse, ChannelPromise> aggregator;
|
||||
private final HttpPipeliningAggregator<ChannelPromise> aggregator;
|
||||
|
||||
/**
|
||||
* Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
|
||||
@ -53,20 +53,20 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
|
||||
|
||||
@Override
|
||||
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
|
||||
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
|
||||
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = aggregator.read(((FullHttpRequest) msg));
|
||||
assert msg instanceof Netty4HttpRequest : "Invalid message type: " + msg.getClass();
|
||||
HttpPipelinedRequest pipelinedRequest = aggregator.read(((Netty4HttpRequest) msg));
|
||||
ctx.fireChannelRead(pipelinedRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
|
||||
assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass();
|
||||
Netty4HttpResponse response = (Netty4HttpResponse) msg;
|
||||
assert msg instanceof HttpPipelinedResponse : "Invalid message type: " + msg.getClass();
|
||||
HttpPipelinedResponse response = (HttpPipelinedResponse) msg;
|
||||
boolean success = false;
|
||||
try {
|
||||
List<Tuple<Netty4HttpResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
|
||||
for (Tuple<Netty4HttpResponse, ChannelPromise> readyResponse : readyResponses) {
|
||||
ctx.write(readyResponse.v1(), readyResponse.v2());
|
||||
List<Tuple<HttpPipelinedResponse, ChannelPromise>> readyResponses = aggregator.write(response, promise);
|
||||
for (Tuple<HttpPipelinedResponse, ChannelPromise> readyResponse : readyResponses) {
|
||||
ctx.write(readyResponse.v1().getDelegateRequest(), readyResponse.v2());
|
||||
}
|
||||
success = true;
|
||||
} catch (IllegalStateException e) {
|
||||
@ -80,11 +80,11 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
|
||||
|
||||
@Override
|
||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
|
||||
List<Tuple<Netty4HttpResponse, ChannelPromise>> inflightResponses = aggregator.removeAllInflightResponses();
|
||||
List<Tuple<HttpPipelinedResponse, ChannelPromise>> inflightResponses = aggregator.removeAllInflightResponses();
|
||||
|
||||
if (inflightResponses.isEmpty() == false) {
|
||||
ClosedChannelException closedChannelException = new ClosedChannelException();
|
||||
for (Tuple<Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
|
||||
for (Tuple<HttpPipelinedResponse, ChannelPromise> inflightResponse : inflightResponses) {
|
||||
try {
|
||||
inflightResponse.v2().setFailure(closedChannelException);
|
||||
} catch (RuntimeException e) {
|
||||
|
@ -46,26 +46,37 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class Netty4HttpRequest implements HttpRequest {
|
||||
private final HttpHeadersMap headers;
|
||||
private final int sequence;
|
||||
private final AtomicBoolean released;
|
||||
private final FullHttpRequest request;
|
||||
private final boolean pooled;
|
||||
private final BytesReference content;
|
||||
|
||||
Netty4HttpRequest(FullHttpRequest request, int sequence) {
|
||||
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
|
||||
private final FullHttpRequest request;
|
||||
private final BytesReference content;
|
||||
private final HttpHeadersMap headers;
|
||||
private final AtomicBoolean released;
|
||||
private final Exception inboundException;
|
||||
private final boolean pooled;
|
||||
|
||||
Netty4HttpRequest(FullHttpRequest request) {
|
||||
this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true,
|
||||
Netty4Utils.toBytesReference(request.content()));
|
||||
}
|
||||
|
||||
private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
|
||||
Netty4HttpRequest(FullHttpRequest request, Exception inboundException) {
|
||||
this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true,
|
||||
Netty4Utils.toBytesReference(request.content()), inboundException);
|
||||
}
|
||||
|
||||
private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled,
|
||||
BytesReference content) {
|
||||
this(request, headers, released, pooled, content, null);
|
||||
}
|
||||
|
||||
private Netty4HttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled,
|
||||
BytesReference content, Exception inboundException) {
|
||||
this.request = request;
|
||||
this.sequence = sequence;
|
||||
this.headers = headers;
|
||||
this.content = content;
|
||||
this.pooled = pooled;
|
||||
this.released = released;
|
||||
this.inboundException = inboundException;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -135,7 +146,7 @@ public class Netty4HttpRequest implements HttpRequest {
|
||||
return new Netty4HttpRequest(
|
||||
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
|
||||
request.trailingHeaders()),
|
||||
headers, sequence, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
|
||||
headers, new AtomicBoolean(false), false, Netty4Utils.toBytesReference(copiedContent));
|
||||
} finally {
|
||||
release();
|
||||
}
|
||||
@ -179,7 +190,7 @@ public class Netty4HttpRequest implements HttpRequest {
|
||||
trailingHeaders.remove(header);
|
||||
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
|
||||
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
|
||||
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
|
||||
return new Netty4HttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), released,
|
||||
pooled, content);
|
||||
}
|
||||
|
||||
@ -188,12 +199,13 @@ public class Netty4HttpRequest implements HttpRequest {
|
||||
return new Netty4HttpResponse(this, status, content);
|
||||
}
|
||||
|
||||
public FullHttpRequest nettyRequest() {
|
||||
return request;
|
||||
@Override
|
||||
public Exception getInboundException() {
|
||||
return inboundException;
|
||||
}
|
||||
|
||||
int sequence() {
|
||||
return sequence;
|
||||
public FullHttpRequest nettyRequest() {
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.netty4;
|
||||
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.MessageToMessageDecoder;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
class Netty4HttpRequestCreator extends MessageToMessageDecoder<FullHttpRequest> {
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, FullHttpRequest msg, List<Object> out) {
|
||||
if (msg.decoderResult().isFailure()) {
|
||||
final Throwable cause = msg.decoderResult().cause();
|
||||
final Exception nonError;
|
||||
if (cause instanceof Error) {
|
||||
ExceptionsHelper.maybeDieOnAnotherThread(cause);
|
||||
nonError = new Exception(cause);
|
||||
} else {
|
||||
nonError = (Exception) cause;
|
||||
}
|
||||
out.add(new Netty4HttpRequest(msg.retain(), nonError));
|
||||
} else {
|
||||
out.add(new Netty4HttpRequest(msg.retain()));
|
||||
}
|
||||
}
|
||||
}
|
@ -22,12 +22,11 @@ package org.elasticsearch.http.netty4;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.http.HttpPipelinedRequest;
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
|
||||
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {
|
||||
|
||||
private final Netty4HttpServerTransport serverTransport;
|
||||
|
||||
@ -36,23 +35,11 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
|
||||
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
|
||||
FullHttpRequest request = msg.getRequest();
|
||||
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest httpRequest) {
|
||||
final Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
|
||||
boolean success = false;
|
||||
Netty4HttpRequest httpRequest = new Netty4HttpRequest(request, msg.getSequence());
|
||||
try {
|
||||
if (request.decoderResult().isFailure()) {
|
||||
Throwable cause = request.decoderResult().cause();
|
||||
if (cause instanceof Error) {
|
||||
ExceptionsHelper.maybeDieOnAnotherThread(cause);
|
||||
serverTransport.incomingRequestError(httpRequest, channel, new Exception(cause));
|
||||
} else {
|
||||
serverTransport.incomingRequestError(httpRequest, channel, (Exception) cause);
|
||||
}
|
||||
} else {
|
||||
serverTransport.incomingRequest(httpRequest, channel);
|
||||
}
|
||||
serverTransport.incomingRequest(httpRequest, channel);
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
|
@ -22,19 +22,16 @@ package org.elasticsearch.http.netty4;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.http.HttpPipelinedMessage;
|
||||
import org.elasticsearch.http.HttpResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.transport.netty4.Netty4Utils;
|
||||
|
||||
public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse, HttpPipelinedMessage {
|
||||
public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse {
|
||||
|
||||
private final int sequence;
|
||||
private final Netty4HttpRequest request;
|
||||
|
||||
Netty4HttpResponse(Netty4HttpRequest request, RestStatus status, BytesReference content) {
|
||||
super(request.nettyRequest().protocolVersion(), HttpResponseStatus.valueOf(status.getStatus()), Netty4Utils.toByteBuf(content));
|
||||
this.sequence = request.sequence();
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
@ -48,11 +45,6 @@ public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpR
|
||||
return headers().contains(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSequence() {
|
||||
return sequence;
|
||||
}
|
||||
|
||||
public Netty4HttpRequest getRequest() {
|
||||
return request;
|
||||
}
|
||||
|
@ -283,12 +283,14 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
||||
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
|
||||
|
||||
private final Netty4HttpServerTransport transport;
|
||||
private final Netty4HttpRequestCreator requestCreator;
|
||||
private final Netty4HttpRequestHandler requestHandler;
|
||||
private final HttpHandlingSettings handlingSettings;
|
||||
|
||||
protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
|
||||
this.transport = transport;
|
||||
this.handlingSettings = handlingSettings;
|
||||
this.requestCreator = new Netty4HttpRequestCreator();
|
||||
this.requestHandler = new Netty4HttpRequestHandler(transport);
|
||||
}
|
||||
|
||||
@ -311,6 +313,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
|
||||
if (handlingSettings.isCompression()) {
|
||||
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
|
||||
}
|
||||
ch.pipeline().addLast("request_creator", requestCreator);
|
||||
if (handlingSettings.isCorsEnabled()) {
|
||||
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.HttpHeaderNames;
|
||||
import io.netty.handler.codec.http.HttpHeaders;
|
||||
import io.netty.handler.codec.http.HttpMethod;
|
||||
@ -33,6 +32,7 @@ import io.netty.handler.codec.http.HttpResponse;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.http.CorsHandler;
|
||||
import org.elasticsearch.http.netty4.Netty4HttpRequest;
|
||||
import org.elasticsearch.http.netty4.Netty4HttpResponse;
|
||||
|
||||
import java.util.Date;
|
||||
@ -52,7 +52,7 @@ public class Netty4CorsHandler extends ChannelDuplexHandler {
|
||||
private static Pattern SCHEME_PATTERN = Pattern.compile("^https?://");
|
||||
|
||||
private final CorsHandler.Config config;
|
||||
private FullHttpRequest request;
|
||||
private Netty4HttpRequest request;
|
||||
|
||||
/**
|
||||
* Creates a new instance with the specified {@link CorsHandler.Config}.
|
||||
@ -66,12 +66,12 @@ public class Netty4CorsHandler extends ChannelDuplexHandler {
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
|
||||
assert msg instanceof Netty4HttpRequest : "Invalid message type: " + msg.getClass();
|
||||
if (config.isCorsSupportEnabled()) {
|
||||
request = (FullHttpRequest) msg;
|
||||
if (isPreflightRequest(request)) {
|
||||
request = (Netty4HttpRequest) msg;
|
||||
if (isPreflightRequest(request.nettyRequest())) {
|
||||
try {
|
||||
handlePreflight(ctx, request);
|
||||
handlePreflight(ctx, request.nettyRequest());
|
||||
return;
|
||||
} finally {
|
||||
releaseRequest();
|
||||
@ -79,7 +79,7 @@ public class Netty4CorsHandler extends ChannelDuplexHandler {
|
||||
}
|
||||
if (!validateOrigin()) {
|
||||
try {
|
||||
forbidden(ctx, request);
|
||||
forbidden(ctx, request.nettyRequest());
|
||||
return;
|
||||
} finally {
|
||||
releaseRequest();
|
||||
@ -167,7 +167,7 @@ public class Netty4CorsHandler extends ChannelDuplexHandler {
|
||||
}
|
||||
|
||||
private boolean setOrigin(final HttpResponse response) {
|
||||
final String origin = request.headers().get(HttpHeaderNames.ORIGIN);
|
||||
final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN);
|
||||
if (!Strings.isNullOrEmpty(origin)) {
|
||||
if (config.isAnyOriginSupported()) {
|
||||
if (config.isCredentialsAllowed()) {
|
||||
@ -192,14 +192,14 @@ public class Netty4CorsHandler extends ChannelDuplexHandler {
|
||||
return true;
|
||||
}
|
||||
|
||||
final String origin = request.headers().get(HttpHeaderNames.ORIGIN);
|
||||
final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN);
|
||||
if (Strings.isNullOrEmpty(origin)) {
|
||||
// Not a CORS request so we cannot validate it. It may be a non CORS request.
|
||||
return true;
|
||||
}
|
||||
|
||||
// if the origin is the same as the host of the request, then allow
|
||||
if (isSameOrigin(origin, request.headers().get(HttpHeaderNames.HOST))) {
|
||||
if (isSameOrigin(origin, request.nettyRequest().headers().get(HttpHeaderNames.HOST))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -207,7 +207,7 @@ public class Netty4CorsHandler extends ChannelDuplexHandler {
|
||||
}
|
||||
|
||||
private void echoRequestOrigin(final HttpResponse response) {
|
||||
setOrigin(response, request.headers().get(HttpHeaderNames.ORIGIN));
|
||||
setOrigin(response, request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN));
|
||||
}
|
||||
|
||||
private static void setVaryHeader(final HttpResponse response) {
|
||||
|
@ -142,7 +142,7 @@ public class Netty4CorsTests extends ESTestCase {
|
||||
httpRequest.headers().add(HttpHeaderNames.HOST, host);
|
||||
EmbeddedChannel embeddedChannel = new EmbeddedChannel();
|
||||
embeddedChannel.pipeline().addLast(new Netty4CorsHandler(CorsHandler.fromSettings(settings)));
|
||||
Netty4HttpRequest nettyRequest = new Netty4HttpRequest(httpRequest, 0);
|
||||
Netty4HttpRequest nettyRequest = new Netty4HttpRequest(httpRequest);
|
||||
embeddedChannel.writeOutbound(nettyRequest.createResponse(RestStatus.OK, new BytesArray("content")));
|
||||
return embeddedChannel.readOutbound();
|
||||
}
|
||||
|
@ -25,16 +25,15 @@ import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.FullHttpResponse;
|
||||
import io.netty.handler.codec.http.HttpMethod;
|
||||
import io.netty.handler.codec.http.HttpRequest;
|
||||
import io.netty.handler.codec.http.LastHttpContent;
|
||||
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.http.HttpPipelinedRequest;
|
||||
import org.elasticsearch.http.HttpPipelinedResponse;
|
||||
import org.elasticsearch.http.HttpResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.After;
|
||||
@ -44,12 +43,10 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedTransferQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
@ -182,8 +179,8 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
|
||||
embeddedChannel.writeInbound(createHttpRequest("/" + i));
|
||||
}
|
||||
|
||||
HttpPipelinedRequest<FullHttpRequest> inbound;
|
||||
ArrayList<HttpPipelinedRequest<FullHttpRequest>> requests = new ArrayList<>();
|
||||
HttpPipelinedRequest inbound;
|
||||
ArrayList<HttpPipelinedRequest> requests = new ArrayList<>();
|
||||
while ((inbound = embeddedChannel.readInbound()) != null) {
|
||||
requests.add(inbound);
|
||||
}
|
||||
@ -192,9 +189,8 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
|
||||
for (int i = 1; i < requests.size(); ++i) {
|
||||
ChannelPromise promise = embeddedChannel.newPromise();
|
||||
promises.add(promise);
|
||||
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = requests.get(i);
|
||||
Netty4HttpRequest nioHttpRequest = new Netty4HttpRequest(pipelinedRequest.getRequest(), pipelinedRequest.getSequence());
|
||||
Netty4HttpResponse resp = nioHttpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY);
|
||||
HttpPipelinedRequest pipelinedRequest = requests.get(i);
|
||||
HttpPipelinedResponse resp = pipelinedRequest.createResponse(RestStatus.OK, BytesArray.EMPTY);
|
||||
embeddedChannel.writeAndFlush(resp, promise);
|
||||
}
|
||||
|
||||
@ -217,37 +213,20 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
|
||||
assertThat(data, is(expectedContent));
|
||||
}
|
||||
|
||||
private FullHttpRequest createHttpRequest(String uri) {
|
||||
return new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri);
|
||||
private Netty4HttpRequest createHttpRequest(String uri) {
|
||||
return new Netty4HttpRequest(new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri));
|
||||
}
|
||||
|
||||
private static class AggregateUrisAndHeadersHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||
|
||||
static final Queue<String> QUEUE_URI = new LinkedTransferQueue<>();
|
||||
private class WorkEmulatorHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception {
|
||||
QUEUE_URI.add(request.uri());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class WorkEmulatorHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
|
||||
|
||||
@Override
|
||||
protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> pipelinedRequest) {
|
||||
LastHttpContent request = pipelinedRequest.getRequest();
|
||||
final QueryStringDecoder decoder;
|
||||
if (request instanceof FullHttpRequest) {
|
||||
decoder = new QueryStringDecoder(((FullHttpRequest)request).uri());
|
||||
} else {
|
||||
decoder = new QueryStringDecoder(AggregateUrisAndHeadersHandler.QUEUE_URI.poll());
|
||||
}
|
||||
protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest pipelinedRequest) {
|
||||
final org.elasticsearch.http.HttpRequest request = pipelinedRequest.getDelegateRequest();
|
||||
final QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
|
||||
|
||||
final String uri = decoder.path().replace("/", "");
|
||||
final BytesReference content = new BytesArray(uri.getBytes(StandardCharsets.UTF_8));
|
||||
Netty4HttpRequest nioHttpRequest = new Netty4HttpRequest(pipelinedRequest.getRequest(), pipelinedRequest.getSequence());
|
||||
Netty4HttpResponse httpResponse = nioHttpRequest.createResponse(RestStatus.OK, content);
|
||||
HttpResponse httpResponse = pipelinedRequest.createResponse(RestStatus.OK, content);
|
||||
httpResponse.addHeader(CONTENT_LENGTH.toString(), Integer.toString(content.length()));
|
||||
|
||||
final CountDownLatch waitingLatch = new CountDownLatch(1);
|
||||
|
@ -26,9 +26,7 @@ import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.FullHttpResponse;
|
||||
import io.netty.handler.codec.http.HttpHeaderNames;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
@ -38,6 +36,7 @@ import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.common.util.MockPageCacheRecycler;
|
||||
import org.elasticsearch.http.HttpPipelinedRequest;
|
||||
import org.elasticsearch.http.HttpResponse;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.http.NullDispatcher;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
@ -154,7 +153,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
|
||||
|
||||
}
|
||||
|
||||
class PossiblySlowUpstreamHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
|
||||
class PossiblySlowUpstreamHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {
|
||||
|
||||
private final ExecutorService executorService;
|
||||
|
||||
@ -163,7 +162,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
|
||||
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest msg) throws Exception {
|
||||
executorService.submit(new PossiblySlowRunnable(ctx, msg));
|
||||
}
|
||||
|
||||
@ -178,26 +177,23 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
|
||||
class PossiblySlowRunnable implements Runnable {
|
||||
|
||||
private ChannelHandlerContext ctx;
|
||||
private HttpPipelinedRequest<FullHttpRequest> pipelinedRequest;
|
||||
private FullHttpRequest fullHttpRequest;
|
||||
private HttpPipelinedRequest pipelinedRequest;
|
||||
|
||||
PossiblySlowRunnable(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) {
|
||||
PossiblySlowRunnable(ChannelHandlerContext ctx, HttpPipelinedRequest msg) {
|
||||
this.ctx = ctx;
|
||||
this.pipelinedRequest = msg;
|
||||
this.fullHttpRequest = pipelinedRequest.getRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final String uri = fullHttpRequest.uri();
|
||||
final String uri = pipelinedRequest.uri();
|
||||
|
||||
final ByteBuf buffer = Unpooled.copiedBuffer(uri, StandardCharsets.UTF_8);
|
||||
|
||||
Netty4HttpRequest httpRequest = new Netty4HttpRequest(fullHttpRequest, pipelinedRequest.getSequence());
|
||||
Netty4HttpResponse response =
|
||||
httpRequest.createResponse(RestStatus.OK, new BytesArray(uri.getBytes(StandardCharsets.UTF_8)));
|
||||
response.headers().add(HttpHeaderNames.CONTENT_LENGTH, buffer.readableBytes());
|
||||
HttpResponse response =
|
||||
pipelinedRequest.createResponse(RestStatus.OK, new BytesArray(uri.getBytes(StandardCharsets.UTF_8)));
|
||||
response.addHeader("content-length", Integer.toString(buffer.readableBytes()));
|
||||
|
||||
final boolean slow = uri.matches("/slow/\\d+");
|
||||
if (slow) {
|
||||
@ -213,7 +209,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
|
||||
final ChannelPromise promise = ctx.newPromise();
|
||||
ctx.writeAndFlush(response, promise);
|
||||
} finally {
|
||||
fullHttpRequest.release();
|
||||
pipelinedRequest.release();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,17 +21,16 @@ package org.elasticsearch.http.nio;
|
||||
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.HttpContentCompressor;
|
||||
import io.netty.handler.codec.http.HttpContentDecompressor;
|
||||
import io.netty.handler.codec.http.HttpObjectAggregator;
|
||||
import io.netty.handler.codec.http.HttpRequestDecoder;
|
||||
import io.netty.handler.codec.http.HttpResponseEncoder;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.http.CorsHandler;
|
||||
import org.elasticsearch.http.HttpHandlingSettings;
|
||||
import org.elasticsearch.http.HttpPipelinedRequest;
|
||||
import org.elasticsearch.http.HttpPipelinedResponse;
|
||||
import org.elasticsearch.http.HttpReadTimeoutException;
|
||||
import org.elasticsearch.http.nio.cors.NioCorsHandler;
|
||||
import org.elasticsearch.nio.FlushOperation;
|
||||
@ -68,7 +67,7 @@ public class HttpReadWriteHandler implements NioChannelHandler {
|
||||
this.nanoClock = nanoClock;
|
||||
this.readTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(settings.getReadTimeoutMillis());
|
||||
|
||||
List<ChannelHandler> handlers = new ArrayList<>(5);
|
||||
List<ChannelHandler> handlers = new ArrayList<>(8);
|
||||
HttpRequestDecoder decoder = new HttpRequestDecoder(settings.getMaxInitialLineLength(), settings.getMaxHeaderSize(),
|
||||
settings.getMaxChunkSize());
|
||||
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
|
||||
@ -79,6 +78,7 @@ public class HttpReadWriteHandler implements NioChannelHandler {
|
||||
if (settings.isCompression()) {
|
||||
handlers.add(new HttpContentCompressor(settings.getCompressionLevel()));
|
||||
}
|
||||
handlers.add(new NioHttpRequestCreator());
|
||||
if (settings.isCorsEnabled()) {
|
||||
handlers.add(new NioCorsHandler(corsConfig));
|
||||
}
|
||||
@ -112,15 +112,13 @@ public class HttpReadWriteHandler implements NioChannelHandler {
|
||||
|
||||
@Override
|
||||
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener) {
|
||||
assert message instanceof NioHttpResponse : "This channel only supports messages that are of type: "
|
||||
+ NioHttpResponse.class + ". Found type: " + message.getClass() + ".";
|
||||
return new HttpWriteOperation(context, (NioHttpResponse) message, listener);
|
||||
assert assertMessageTypes(message);
|
||||
return new HttpWriteOperation(context, (HttpPipelinedResponse) message, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<FlushOperation> writeToBytes(WriteOperation writeOperation) {
|
||||
assert writeOperation.getObject() instanceof NioHttpResponse : "This channel only supports messages that are of type: "
|
||||
+ NioHttpResponse.class + ". Found type: " + writeOperation.getObject().getClass() + ".";
|
||||
assert assertMessageTypes(writeOperation.getObject());
|
||||
assert channelActive : "channelActive should have been called";
|
||||
--inFlightRequests;
|
||||
assert inFlightRequests >= 0 : "Inflight requests should never drop below zero, found: " + inFlightRequests;
|
||||
@ -154,26 +152,14 @@ public class HttpReadWriteHandler implements NioChannelHandler {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private void handleRequest(Object msg) {
|
||||
final HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = (HttpPipelinedRequest<FullHttpRequest>) msg;
|
||||
FullHttpRequest request = pipelinedRequest.getRequest();
|
||||
final HttpPipelinedRequest pipelinedRequest = (HttpPipelinedRequest) msg;
|
||||
boolean success = false;
|
||||
NioHttpRequest httpRequest = new NioHttpRequest(request, pipelinedRequest.getSequence());
|
||||
try {
|
||||
if (request.decoderResult().isFailure()) {
|
||||
Throwable cause = request.decoderResult().cause();
|
||||
if (cause instanceof Error) {
|
||||
ExceptionsHelper.maybeDieOnAnotherThread(cause);
|
||||
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
|
||||
} else {
|
||||
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
|
||||
}
|
||||
} else {
|
||||
transport.incomingRequest(httpRequest, nioHttpChannel);
|
||||
}
|
||||
transport.incomingRequest(pipelinedRequest, nioHttpChannel);
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
request.release();
|
||||
pipelinedRequest.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -190,4 +176,13 @@ public class HttpReadWriteHandler implements NioChannelHandler {
|
||||
private void scheduleReadTimeout() {
|
||||
taskScheduler.scheduleAtRelativeTime(this::maybeReadTimeout, nanoClock.getAsLong() + readTimeoutNanos);
|
||||
}
|
||||
|
||||
private static boolean assertMessageTypes(Object message) {
|
||||
assert message instanceof HttpPipelinedResponse : "This channel only supports messages that are of type: "
|
||||
+ HttpPipelinedResponse.class + ". Found type: " + message.getClass() + ".";
|
||||
assert ((HttpPipelinedResponse) message).getDelegateRequest() instanceof NioHttpResponse :
|
||||
"This channel only pipelined responses with a delegate of type: " + NioHttpResponse.class +
|
||||
". Found type: " + ((HttpPipelinedResponse) message).getDelegateRequest().getClass() + ".";
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.http.nio;
|
||||
|
||||
import org.elasticsearch.http.HttpPipelinedResponse;
|
||||
import org.elasticsearch.nio.SocketChannelContext;
|
||||
import org.elasticsearch.nio.WriteOperation;
|
||||
|
||||
@ -27,10 +28,10 @@ import java.util.function.BiConsumer;
|
||||
public class HttpWriteOperation implements WriteOperation {
|
||||
|
||||
private final SocketChannelContext channelContext;
|
||||
private final NioHttpResponse response;
|
||||
private final HttpPipelinedResponse response;
|
||||
private final BiConsumer<Void, Exception> listener;
|
||||
|
||||
HttpWriteOperation(SocketChannelContext channelContext, NioHttpResponse response, BiConsumer<Void, Exception> listener) {
|
||||
HttpWriteOperation(SocketChannelContext channelContext, HttpPipelinedResponse response, BiConsumer<Void, Exception> listener) {
|
||||
this.channelContext = channelContext;
|
||||
this.response = response;
|
||||
this.listener = listener;
|
||||
@ -47,7 +48,7 @@ public class HttpWriteOperation implements WriteOperation {
|
||||
}
|
||||
|
||||
@Override
|
||||
public NioHttpResponse getObject() {
|
||||
public HttpPipelinedResponse getObject() {
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
@ -22,11 +22,12 @@ package org.elasticsearch.http.nio;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.http.HttpPipelinedRequest;
|
||||
import org.elasticsearch.http.HttpPipelinedResponse;
|
||||
import org.elasticsearch.http.HttpPipeliningAggregator;
|
||||
import org.elasticsearch.http.HttpRequest;
|
||||
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.List;
|
||||
@ -37,7 +38,7 @@ import java.util.List;
|
||||
public class NioHttpPipeliningHandler extends ChannelDuplexHandler {
|
||||
|
||||
private final Logger logger;
|
||||
private final HttpPipeliningAggregator<NioHttpResponse, NettyListener> aggregator;
|
||||
private final HttpPipeliningAggregator<NettyListener> aggregator;
|
||||
|
||||
/**
|
||||
* Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
|
||||
@ -53,22 +54,22 @@ public class NioHttpPipeliningHandler extends ChannelDuplexHandler {
|
||||
|
||||
@Override
|
||||
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
|
||||
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
|
||||
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = aggregator.read(((FullHttpRequest) msg));
|
||||
assert msg instanceof HttpRequest : "Invalid message type: " + msg.getClass();
|
||||
HttpPipelinedRequest pipelinedRequest = aggregator.read((HttpRequest) msg);
|
||||
ctx.fireChannelRead(pipelinedRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
|
||||
assert msg instanceof NioHttpResponse : "Invalid message type: " + msg.getClass();
|
||||
NioHttpResponse response = (NioHttpResponse) msg;
|
||||
assert msg instanceof HttpPipelinedResponse : "Invalid message type: " + msg.getClass();
|
||||
HttpPipelinedResponse response = (HttpPipelinedResponse) msg;
|
||||
boolean success = false;
|
||||
try {
|
||||
NettyListener listener = NettyListener.fromChannelPromise(promise);
|
||||
List<Tuple<NioHttpResponse, NettyListener>> readyResponses = aggregator.write(response, listener);
|
||||
List<Tuple<HttpPipelinedResponse, NettyListener>> readyResponses = aggregator.write(response, listener);
|
||||
success = true;
|
||||
for (Tuple<NioHttpResponse, NettyListener> responseToWrite : readyResponses) {
|
||||
ctx.write(responseToWrite.v1(), responseToWrite.v2());
|
||||
for (Tuple<HttpPipelinedResponse, NettyListener> responseToWrite : readyResponses) {
|
||||
ctx.write(responseToWrite.v1().getDelegateRequest(), responseToWrite.v2());
|
||||
}
|
||||
} catch (IllegalStateException e) {
|
||||
ctx.channel().close();
|
||||
@ -81,11 +82,11 @@ public class NioHttpPipeliningHandler extends ChannelDuplexHandler {
|
||||
|
||||
@Override
|
||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
|
||||
List<Tuple<NioHttpResponse, NettyListener>> inflightResponses = aggregator.removeAllInflightResponses();
|
||||
List<Tuple<HttpPipelinedResponse, NettyListener>> inflightResponses = aggregator.removeAllInflightResponses();
|
||||
|
||||
if (inflightResponses.isEmpty() == false) {
|
||||
ClosedChannelException closedChannelException = new ClosedChannelException();
|
||||
for (Tuple<NioHttpResponse, NettyListener> inflightResponse : inflightResponses) {
|
||||
for (Tuple<HttpPipelinedResponse, NettyListener> inflightResponse : inflightResponses) {
|
||||
try {
|
||||
inflightResponse.v2().setFailure(closedChannelException);
|
||||
} catch (RuntimeException e) {
|
||||
|
@ -49,23 +49,33 @@ public class NioHttpRequest implements HttpRequest {
|
||||
private final FullHttpRequest request;
|
||||
private final BytesReference content;
|
||||
private final HttpHeadersMap headers;
|
||||
private final int sequence;
|
||||
private final AtomicBoolean released;
|
||||
private final Exception inboundException;
|
||||
private final boolean pooled;
|
||||
|
||||
NioHttpRequest(FullHttpRequest request, int sequence) {
|
||||
this(request, new HttpHeadersMap(request.headers()), sequence, new AtomicBoolean(false), true,
|
||||
NioHttpRequest(FullHttpRequest request) {
|
||||
this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true,
|
||||
ByteBufUtils.toBytesReference(request.content()));
|
||||
}
|
||||
|
||||
private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, int sequence, AtomicBoolean released, boolean pooled,
|
||||
BytesReference content) {
|
||||
NioHttpRequest(FullHttpRequest request, Exception inboundException) {
|
||||
this(request, new HttpHeadersMap(request.headers()), new AtomicBoolean(false), true,
|
||||
ByteBufUtils.toBytesReference(request.content()), inboundException);
|
||||
}
|
||||
|
||||
private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled,
|
||||
BytesReference content) {
|
||||
this(request, headers, released, pooled, content, null);
|
||||
}
|
||||
|
||||
private NioHttpRequest(FullHttpRequest request, HttpHeadersMap headers, AtomicBoolean released, boolean pooled,
|
||||
BytesReference content, Exception inboundException) {
|
||||
this.request = request;
|
||||
this.sequence = sequence;
|
||||
this.headers = headers;
|
||||
this.content = content;
|
||||
this.pooled = pooled;
|
||||
this.released = released;
|
||||
this.inboundException = inboundException;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -135,7 +145,7 @@ public class NioHttpRequest implements HttpRequest {
|
||||
return new NioHttpRequest(
|
||||
new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(), copiedContent, request.headers(),
|
||||
request.trailingHeaders()),
|
||||
headers, sequence, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent));
|
||||
headers, new AtomicBoolean(false), false, ByteBufUtils.toBytesReference(copiedContent));
|
||||
} finally {
|
||||
release();
|
||||
}
|
||||
@ -179,8 +189,7 @@ public class NioHttpRequest implements HttpRequest {
|
||||
trailingHeaders.remove(header);
|
||||
FullHttpRequest requestWithoutHeader = new DefaultFullHttpRequest(request.protocolVersion(), request.method(), request.uri(),
|
||||
request.content(), headersWithoutContentTypeHeader, trailingHeaders);
|
||||
return new NioHttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), sequence, released,
|
||||
pooled, content);
|
||||
return new NioHttpRequest(requestWithoutHeader, new HttpHeadersMap(requestWithoutHeader.headers()), released, pooled, content);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -188,12 +197,13 @@ public class NioHttpRequest implements HttpRequest {
|
||||
return new NioHttpResponse(this, status, content);
|
||||
}
|
||||
|
||||
public FullHttpRequest nettyRequest() {
|
||||
return request;
|
||||
@Override
|
||||
public Exception getInboundException() {
|
||||
return inboundException;
|
||||
}
|
||||
|
||||
int sequence() {
|
||||
return sequence;
|
||||
public FullHttpRequest nettyRequest() {
|
||||
return request;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http.nio;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.MessageToMessageDecoder;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
class NioHttpRequestCreator extends MessageToMessageDecoder<FullHttpRequest> {
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, FullHttpRequest msg, List<Object> out) {
|
||||
if (msg.decoderResult().isFailure()) {
|
||||
final Throwable cause = msg.decoderResult().cause();
|
||||
final Exception nonError;
|
||||
if (cause instanceof Error) {
|
||||
ExceptionsHelper.maybeDieOnAnotherThread(cause);
|
||||
nonError = new Exception(cause);
|
||||
} else {
|
||||
nonError = (Exception) cause;
|
||||
}
|
||||
out.add(new NioHttpRequest(msg.retain(), nonError));
|
||||
} else {
|
||||
out.add(new NioHttpRequest(msg.retain()));
|
||||
}
|
||||
}
|
||||
}
|
@ -22,18 +22,15 @@ package org.elasticsearch.http.nio;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.http.HttpPipelinedMessage;
|
||||
import org.elasticsearch.http.HttpResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
public class NioHttpResponse extends DefaultFullHttpResponse implements HttpResponse, HttpPipelinedMessage {
|
||||
public class NioHttpResponse extends DefaultFullHttpResponse implements HttpResponse {
|
||||
|
||||
private final int sequence;
|
||||
private final NioHttpRequest request;
|
||||
|
||||
NioHttpResponse(NioHttpRequest request, RestStatus status, BytesReference content) {
|
||||
super(request.nettyRequest().protocolVersion(), HttpResponseStatus.valueOf(status.getStatus()), ByteBufUtils.toByteBuf(content));
|
||||
this.sequence = request.sequence();
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
@ -47,11 +44,6 @@ public class NioHttpResponse extends DefaultFullHttpResponse implements HttpResp
|
||||
return headers().contains(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSequence() {
|
||||
return sequence;
|
||||
}
|
||||
|
||||
public NioHttpRequest getRequest() {
|
||||
return request;
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.HttpHeaderNames;
|
||||
import io.netty.handler.codec.http.HttpHeaders;
|
||||
import io.netty.handler.codec.http.HttpMethod;
|
||||
@ -33,6 +32,7 @@ import io.netty.handler.codec.http.HttpResponse;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.http.CorsHandler;
|
||||
import org.elasticsearch.http.nio.NioHttpRequest;
|
||||
import org.elasticsearch.http.nio.NioHttpResponse;
|
||||
|
||||
import java.util.Date;
|
||||
@ -53,7 +53,7 @@ public class NioCorsHandler extends ChannelDuplexHandler {
|
||||
private static Pattern SCHEME_PATTERN = Pattern.compile("^https?://");
|
||||
|
||||
private final CorsHandler.Config config;
|
||||
private FullHttpRequest request;
|
||||
private NioHttpRequest request;
|
||||
|
||||
/**
|
||||
* Creates a new instance with the specified {@link CorsHandler.Config}.
|
||||
@ -67,12 +67,12 @@ public class NioCorsHandler extends ChannelDuplexHandler {
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
|
||||
assert msg instanceof NioHttpRequest : "Invalid message type: " + msg.getClass();
|
||||
if (config.isCorsSupportEnabled()) {
|
||||
request = (FullHttpRequest) msg;
|
||||
if (isPreflightRequest(request)) {
|
||||
request = (NioHttpRequest) msg;
|
||||
if (isPreflightRequest(request.nettyRequest())) {
|
||||
try {
|
||||
handlePreflight(ctx, request);
|
||||
handlePreflight(ctx, request.nettyRequest());
|
||||
return;
|
||||
} finally {
|
||||
releaseRequest();
|
||||
@ -80,7 +80,7 @@ public class NioCorsHandler extends ChannelDuplexHandler {
|
||||
}
|
||||
if (!validateOrigin()) {
|
||||
try {
|
||||
forbidden(ctx, request);
|
||||
forbidden(ctx, request.nettyRequest());
|
||||
return;
|
||||
} finally {
|
||||
releaseRequest();
|
||||
@ -168,7 +168,7 @@ public class NioCorsHandler extends ChannelDuplexHandler {
|
||||
}
|
||||
|
||||
private boolean setOrigin(final HttpResponse response) {
|
||||
final String origin = request.headers().get(HttpHeaderNames.ORIGIN);
|
||||
final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN);
|
||||
if (!Strings.isNullOrEmpty(origin)) {
|
||||
if (config.isAnyOriginSupported()) {
|
||||
if (config.isCredentialsAllowed()) {
|
||||
@ -193,14 +193,14 @@ public class NioCorsHandler extends ChannelDuplexHandler {
|
||||
return true;
|
||||
}
|
||||
|
||||
final String origin = request.headers().get(HttpHeaderNames.ORIGIN);
|
||||
final String origin = request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN);
|
||||
if (Strings.isNullOrEmpty(origin)) {
|
||||
// Not a CORS request so we cannot validate it. It may be a non CORS request.
|
||||
return true;
|
||||
}
|
||||
|
||||
// if the origin is the same as the host of the request, then allow
|
||||
if (isSameOrigin(origin, request.headers().get(HttpHeaderNames.HOST))) {
|
||||
if (isSameOrigin(origin, request.nettyRequest().headers().get(HttpHeaderNames.HOST))) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -208,7 +208,7 @@ public class NioCorsHandler extends ChannelDuplexHandler {
|
||||
}
|
||||
|
||||
private void echoRequestOrigin(final HttpResponse response) {
|
||||
setOrigin(response, request.headers().get(HttpHeaderNames.ORIGIN));
|
||||
setOrigin(response, request.nettyRequest().headers().get(HttpHeaderNames.ORIGIN));
|
||||
}
|
||||
|
||||
private static void setVaryHeader(final HttpResponse response) {
|
||||
|
@ -40,6 +40,8 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.http.CorsHandler;
|
||||
import org.elasticsearch.http.HttpChannel;
|
||||
import org.elasticsearch.http.HttpHandlingSettings;
|
||||
import org.elasticsearch.http.HttpPipelinedRequest;
|
||||
import org.elasticsearch.http.HttpPipelinedResponse;
|
||||
import org.elasticsearch.http.HttpReadTimeoutException;
|
||||
import org.elasticsearch.http.HttpRequest;
|
||||
import org.elasticsearch.http.HttpResponse;
|
||||
@ -114,7 +116,7 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||
ByteBuf buf = requestEncoder.encode(httpRequest);
|
||||
int slicePoint = randomInt(buf.writerIndex() - 1);
|
||||
ByteBuf slicedBuf = buf.retainedSlice(0, slicePoint);
|
||||
ByteBuf slicedBuf2 = buf.retainedSlice(slicePoint, buf.writerIndex());
|
||||
ByteBuf slicedBuf2 = buf.retainedSlice(slicePoint, buf.writerIndex() - slicePoint);
|
||||
try {
|
||||
handler.consumeReads(toChannelBuffer(slicedBuf));
|
||||
|
||||
@ -148,10 +150,11 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||
|
||||
handler.consumeReads(toChannelBuffer(buf));
|
||||
|
||||
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
|
||||
verify(transport).incomingRequestError(any(HttpRequest.class), any(NioHttpChannel.class), exceptionCaptor.capture());
|
||||
ArgumentCaptor<HttpRequest> requestCaptor = ArgumentCaptor.forClass(HttpRequest.class);
|
||||
verify(transport).incomingRequest(requestCaptor.capture(), any(NioHttpChannel.class));
|
||||
|
||||
assertTrue(exceptionCaptor.getValue() instanceof IllegalArgumentException);
|
||||
assertNotNull(requestCaptor.getValue().getInboundException());
|
||||
assertTrue(requestCaptor.getValue().getInboundException() instanceof IllegalArgumentException);
|
||||
} finally {
|
||||
buf.release();
|
||||
}
|
||||
@ -169,7 +172,6 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||
} finally {
|
||||
buf.release();
|
||||
}
|
||||
verify(transport, times(0)).incomingRequestError(any(), any(), any());
|
||||
verify(transport, times(0)).incomingRequest(any(), any());
|
||||
|
||||
List<FlushOperation> flushOperations = handler.pollFlushOperations();
|
||||
@ -193,7 +195,7 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testEncodeHttpResponse() throws IOException {
|
||||
prepareHandlerForResponse(handler);
|
||||
NioHttpResponse httpResponse = emptyGetResponse(0);
|
||||
HttpPipelinedResponse httpResponse = emptyGetResponse(0);
|
||||
|
||||
SocketChannelContext context = mock(SocketChannelContext.class);
|
||||
HttpWriteOperation writeOperation = new HttpWriteOperation(context, httpResponse, mock(BiConsumer.class));
|
||||
@ -372,10 +374,10 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||
assertNull(taskScheduler.pollTask(timeValue.getNanos() + 9));
|
||||
}
|
||||
|
||||
private static NioHttpResponse emptyGetResponse(int sequenceNumber) {
|
||||
private static HttpPipelinedResponse emptyGetResponse(int sequence) {
|
||||
DefaultFullHttpRequest nettyRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
|
||||
NioHttpRequest nioHttpRequest = new NioHttpRequest(nettyRequest, sequenceNumber);
|
||||
NioHttpResponse httpResponse = nioHttpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY);
|
||||
HttpPipelinedRequest httpRequest = new HttpPipelinedRequest(sequence, new NioHttpRequest(nettyRequest));
|
||||
HttpPipelinedResponse httpResponse = httpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY);
|
||||
httpResponse.addHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), "0");
|
||||
return httpResponse;
|
||||
}
|
||||
@ -392,9 +394,9 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||
httpRequest.headers().add(HttpHeaderNames.ORIGIN, originValue);
|
||||
}
|
||||
httpRequest.headers().add(HttpHeaderNames.HOST, host);
|
||||
NioHttpRequest nioHttpRequest = new NioHttpRequest(httpRequest, 0);
|
||||
HttpPipelinedRequest pipelinedRequest = new HttpPipelinedRequest(0, new NioHttpRequest(httpRequest));
|
||||
BytesArray content = new BytesArray("content");
|
||||
HttpResponse response = nioHttpRequest.createResponse(RestStatus.OK, content);
|
||||
HttpResponse response = pipelinedRequest.createResponse(RestStatus.OK, content);
|
||||
response.addHeader("Content-Length", Integer.toString(content.length()));
|
||||
|
||||
SocketChannelContext context = mock(SocketChannelContext.class);
|
||||
@ -420,18 +422,18 @@ public class HttpReadWriteHandlerTests extends ESTestCase {
|
||||
buf.release();
|
||||
}
|
||||
|
||||
ArgumentCaptor<NioHttpRequest> requestCaptor = ArgumentCaptor.forClass(NioHttpRequest.class);
|
||||
ArgumentCaptor<HttpPipelinedRequest> requestCaptor = ArgumentCaptor.forClass(HttpPipelinedRequest.class);
|
||||
verify(transport, atLeastOnce()).incomingRequest(requestCaptor.capture(), any(HttpChannel.class));
|
||||
|
||||
NioHttpRequest nioHttpRequest = requestCaptor.getValue();
|
||||
assertNotNull(nioHttpRequest);
|
||||
assertEquals(method.name(), nioHttpRequest.method().name());
|
||||
HttpRequest httpRequest = requestCaptor.getValue();
|
||||
assertNotNull(httpRequest);
|
||||
assertEquals(method.name(), httpRequest.method().name());
|
||||
if (version == HttpVersion.HTTP_1_1) {
|
||||
assertEquals(HttpRequest.HttpVersion.HTTP_1_1, nioHttpRequest.protocolVersion());
|
||||
assertEquals(HttpRequest.HttpVersion.HTTP_1_1, httpRequest.protocolVersion());
|
||||
} else {
|
||||
assertEquals(HttpRequest.HttpVersion.HTTP_1_0, nioHttpRequest.protocolVersion());
|
||||
assertEquals(HttpRequest.HttpVersion.HTTP_1_0, httpRequest.protocolVersion());
|
||||
}
|
||||
assertEquals(nioHttpRequest.uri(), uri);
|
||||
assertEquals(httpRequest.uri(), uri);
|
||||
}
|
||||
|
||||
private InboundChannelBuffer toChannelBuffer(ByteBuf buf) {
|
||||
|
@ -25,16 +25,16 @@ import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.FullHttpResponse;
|
||||
import io.netty.handler.codec.http.HttpMethod;
|
||||
import io.netty.handler.codec.http.HttpRequest;
|
||||
import io.netty.handler.codec.http.LastHttpContent;
|
||||
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.http.HttpPipelinedRequest;
|
||||
import org.elasticsearch.http.HttpPipelinedResponse;
|
||||
import org.elasticsearch.http.HttpRequest;
|
||||
import org.elasticsearch.http.HttpResponse;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.After;
|
||||
@ -44,12 +44,10 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedTransferQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
@ -172,7 +170,7 @@ public class NioHttpPipeliningHandlerTests extends ESTestCase {
|
||||
assertFalse(embeddedChannel.isOpen());
|
||||
}
|
||||
|
||||
public void testPipeliningRequestsAreReleased() throws InterruptedException {
|
||||
public void testPipeliningRequestsAreReleased() {
|
||||
final int numberOfRequests = 10;
|
||||
final EmbeddedChannel embeddedChannel =
|
||||
new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests + 1));
|
||||
@ -181,8 +179,8 @@ public class NioHttpPipeliningHandlerTests extends ESTestCase {
|
||||
embeddedChannel.writeInbound(createHttpRequest("/" + i));
|
||||
}
|
||||
|
||||
HttpPipelinedRequest<FullHttpRequest> inbound;
|
||||
ArrayList<HttpPipelinedRequest<FullHttpRequest>> requests = new ArrayList<>();
|
||||
HttpPipelinedRequest inbound;
|
||||
ArrayList<HttpPipelinedRequest> requests = new ArrayList<>();
|
||||
while ((inbound = embeddedChannel.readInbound()) != null) {
|
||||
requests.add(inbound);
|
||||
}
|
||||
@ -191,9 +189,8 @@ public class NioHttpPipeliningHandlerTests extends ESTestCase {
|
||||
for (int i = 1; i < requests.size(); ++i) {
|
||||
ChannelPromise promise = embeddedChannel.newPromise();
|
||||
promises.add(promise);
|
||||
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = requests.get(i);
|
||||
NioHttpRequest nioHttpRequest = new NioHttpRequest(pipelinedRequest.getRequest(), pipelinedRequest.getSequence());
|
||||
NioHttpResponse resp = nioHttpRequest.createResponse(RestStatus.OK, BytesArray.EMPTY);
|
||||
HttpPipelinedRequest pipelinedRequest = requests.get(i);
|
||||
HttpPipelinedResponse resp = pipelinedRequest.createResponse(RestStatus.OK, BytesArray.EMPTY);
|
||||
embeddedChannel.writeAndFlush(resp, promise);
|
||||
}
|
||||
|
||||
@ -215,37 +212,20 @@ public class NioHttpPipeliningHandlerTests extends ESTestCase {
|
||||
assertThat(data, is(expectedContent));
|
||||
}
|
||||
|
||||
private FullHttpRequest createHttpRequest(String uri) {
|
||||
return new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri);
|
||||
private NioHttpRequest createHttpRequest(String uri) {
|
||||
return new NioHttpRequest(new DefaultFullHttpRequest(HTTP_1_1, HttpMethod.GET, uri));
|
||||
}
|
||||
|
||||
private static class AggregateUrisAndHeadersHandler extends SimpleChannelInboundHandler<HttpRequest> {
|
||||
|
||||
static final Queue<String> QUEUE_URI = new LinkedTransferQueue<>();
|
||||
private class WorkEmulatorHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) throws Exception {
|
||||
QUEUE_URI.add(request.uri());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private class WorkEmulatorHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
|
||||
|
||||
@Override
|
||||
protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> pipelinedRequest) {
|
||||
LastHttpContent request = pipelinedRequest.getRequest();
|
||||
final QueryStringDecoder decoder;
|
||||
if (request instanceof FullHttpRequest) {
|
||||
decoder = new QueryStringDecoder(((FullHttpRequest)request).uri());
|
||||
} else {
|
||||
decoder = new QueryStringDecoder(AggregateUrisAndHeadersHandler.QUEUE_URI.poll());
|
||||
}
|
||||
protected void channelRead0(final ChannelHandlerContext ctx, HttpPipelinedRequest pipelinedRequest) {
|
||||
final HttpRequest request = pipelinedRequest.getDelegateRequest();
|
||||
final QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
|
||||
|
||||
final String uri = decoder.path().replace("/", "");
|
||||
final BytesReference content = new BytesArray(uri.getBytes(StandardCharsets.UTF_8));
|
||||
NioHttpRequest nioHttpRequest = new NioHttpRequest(pipelinedRequest.getRequest(), pipelinedRequest.getSequence());
|
||||
NioHttpResponse httpResponse = nioHttpRequest.createResponse(RestStatus.OK, content);
|
||||
HttpResponse httpResponse = pipelinedRequest.createResponse(RestStatus.OK, content);
|
||||
httpResponse.addHeader(CONTENT_LENGTH.toString(), Integer.toString(content.length()));
|
||||
|
||||
final CountDownLatch waitingLatch = new CountDownLatch(1);
|
||||
|
@ -305,18 +305,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
|
||||
* @param httpChannel that received the http request
|
||||
*/
|
||||
public void incomingRequest(final HttpRequest httpRequest, final HttpChannel httpChannel) {
|
||||
handleIncomingRequest(httpRequest, httpChannel, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method handles an incoming http request that has encountered an error.
|
||||
*
|
||||
* @param httpRequest that is incoming
|
||||
* @param httpChannel that received the http request
|
||||
* @param exception that was encountered
|
||||
*/
|
||||
public void incomingRequestError(final HttpRequest httpRequest, final HttpChannel httpChannel, final Exception exception) {
|
||||
handleIncomingRequest(httpRequest, httpChannel, exception);
|
||||
handleIncomingRequest(httpRequest, httpChannel, httpRequest.getInboundException());
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
|
@ -68,6 +68,7 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann
|
||||
HttpHandlingSettings settings, ThreadContext threadContext, @Nullable HttpTracer tracerLog) {
|
||||
super(request, settings.getDetailedErrorsEnabled());
|
||||
this.httpChannel = httpChannel;
|
||||
// TODO: Fix
|
||||
this.httpRequest = httpRequest;
|
||||
this.bigArrays = bigArrays;
|
||||
this.settings = settings;
|
||||
|
@ -16,16 +16,79 @@
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.http;
|
||||
|
||||
public class HttpPipelinedRequest<R> implements HttpPipelinedMessage {
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class HttpPipelinedRequest implements HttpRequest, HttpPipelinedMessage {
|
||||
|
||||
private final R request;
|
||||
private final int sequence;
|
||||
private final HttpRequest delegate;
|
||||
|
||||
HttpPipelinedRequest(int sequence, R request) {
|
||||
public HttpPipelinedRequest(int sequence, HttpRequest delegate) {
|
||||
this.sequence = sequence;
|
||||
this.request = request;
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestRequest.Method method() {
|
||||
return delegate.method();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uri() {
|
||||
return delegate.uri();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BytesReference content() {
|
||||
return delegate.content();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, List<String>> getHeaders() {
|
||||
return delegate.getHeaders();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> strictCookies() {
|
||||
return delegate.strictCookies();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpVersion protocolVersion() {
|
||||
return delegate.protocolVersion();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpRequest removeHeader(String header) {
|
||||
return delegate.removeHeader(header);
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpPipelinedResponse createResponse(RestStatus status, BytesReference content) {
|
||||
return new HttpPipelinedResponse(sequence, delegate.createResponse(status, content));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void release() {
|
||||
delegate.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpRequest releaseAndCopy() {
|
||||
return delegate.releaseAndCopy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exception getInboundException() {
|
||||
return delegate.getInboundException();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -33,7 +96,7 @@ public class HttpPipelinedRequest<R> implements HttpPipelinedMessage {
|
||||
return sequence;
|
||||
}
|
||||
|
||||
public R getRequest() {
|
||||
return request;
|
||||
public HttpRequest getDelegateRequest() {
|
||||
return delegate;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.http;
|
||||
|
||||
public class HttpPipelinedResponse implements HttpPipelinedMessage, HttpResponse {
|
||||
|
||||
private final int sequence;
|
||||
private final HttpResponse delegate;
|
||||
|
||||
public HttpPipelinedResponse(int sequence, HttpResponse delegate) {
|
||||
this.sequence = sequence;
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSequence() {
|
||||
return sequence;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addHeader(String name, String value) {
|
||||
delegate.addHeader(name, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean containsHeader(String name) {
|
||||
return delegate.containsHeader(name);
|
||||
}
|
||||
|
||||
public HttpResponse getDelegateRequest() {
|
||||
return delegate;
|
||||
}
|
||||
}
|
@ -25,10 +25,10 @@ import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.PriorityQueue;
|
||||
|
||||
public class HttpPipeliningAggregator<Response extends HttpPipelinedMessage, Listener> {
|
||||
public class HttpPipeliningAggregator<Listener> {
|
||||
|
||||
private final int maxEventsHeld;
|
||||
private final PriorityQueue<Tuple<Response, Listener>> outboundHoldingQueue;
|
||||
private final PriorityQueue<Tuple<HttpPipelinedResponse, Listener>> outboundHoldingQueue;
|
||||
/*
|
||||
* The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
|
||||
* channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the
|
||||
@ -42,20 +42,20 @@ public class HttpPipeliningAggregator<Response extends HttpPipelinedMessage, Lis
|
||||
this.outboundHoldingQueue = new PriorityQueue<>(1, Comparator.comparing(Tuple::v1));
|
||||
}
|
||||
|
||||
public <Request> HttpPipelinedRequest<Request> read(final Request request) {
|
||||
return new HttpPipelinedRequest<>(readSequence++, request);
|
||||
public HttpPipelinedRequest read(final HttpRequest request) {
|
||||
return new HttpPipelinedRequest(readSequence++, request);
|
||||
}
|
||||
|
||||
public List<Tuple<Response, Listener>> write(final Response response, Listener listener) {
|
||||
public List<Tuple<HttpPipelinedResponse, Listener>> write(final HttpPipelinedResponse response, Listener listener) {
|
||||
if (outboundHoldingQueue.size() < maxEventsHeld) {
|
||||
ArrayList<Tuple<Response, Listener>> readyResponses = new ArrayList<>();
|
||||
ArrayList<Tuple<HttpPipelinedResponse, Listener>> readyResponses = new ArrayList<>();
|
||||
outboundHoldingQueue.add(new Tuple<>(response, listener));
|
||||
while (!outboundHoldingQueue.isEmpty()) {
|
||||
/*
|
||||
* Since the response with the lowest sequence number is the top of the priority queue, we know if its sequence
|
||||
* number does not match the current write sequence number then we have not processed all preceding responses yet.
|
||||
*/
|
||||
final Tuple<Response, Listener> top = outboundHoldingQueue.peek();
|
||||
final Tuple<HttpPipelinedResponse, Listener> top = outboundHoldingQueue.peek();
|
||||
|
||||
if (top.v1().getSequence() != writeSequence) {
|
||||
break;
|
||||
@ -73,8 +73,8 @@ public class HttpPipeliningAggregator<Response extends HttpPipelinedMessage, Lis
|
||||
}
|
||||
}
|
||||
|
||||
public List<Tuple<Response, Listener>> removeAllInflightResponses() {
|
||||
ArrayList<Tuple<Response, Listener>> responses = new ArrayList<>(outboundHoldingQueue);
|
||||
public List<Tuple<HttpPipelinedResponse, Listener>> removeAllInflightResponses() {
|
||||
ArrayList<Tuple<HttpPipelinedResponse, Listener>> responses = new ArrayList<>(outboundHoldingQueue);
|
||||
outboundHoldingQueue.clear();
|
||||
return responses;
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
package org.elasticsearch.http;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
@ -68,6 +69,9 @@ public interface HttpRequest {
|
||||
*/
|
||||
HttpResponse createResponse(RestStatus status, BytesReference content);
|
||||
|
||||
@Nullable
|
||||
Exception getInboundException();
|
||||
|
||||
/**
|
||||
* Release any resources associated with this request. Implementations should be idempotent. The behavior of {@link #content()}
|
||||
* after this method has been invoked is undefined and implementation specific.
|
||||
|
@ -250,33 +250,37 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
|
||||
"received other request", traceLoggerName, Level.TRACE,
|
||||
"\\[\\d+\\]\\[" + opaqueId + "\\]\\[OPTIONS\\]\\[/internal/testNotSeen\\] received request from \\[.*"));
|
||||
|
||||
final Exception inboundException;
|
||||
if (badRequest) {
|
||||
inboundException = new RuntimeException();
|
||||
} else {
|
||||
inboundException = null;
|
||||
}
|
||||
|
||||
final FakeRestRequest fakeRestRequest = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
|
||||
.withMethod(RestRequest.Method.OPTIONS)
|
||||
.withPath("/internal/test")
|
||||
.withHeaders(Collections.singletonMap(Task.X_OPAQUE_ID, Collections.singletonList(opaqueId)))
|
||||
.withInboundException(inboundException)
|
||||
.build();
|
||||
|
||||
if (badRequest) {
|
||||
transport.incomingRequestError(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel(),
|
||||
new RuntimeException());
|
||||
transport.incomingRequest(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel());
|
||||
|
||||
final Exception inboundExceptionExcludedPath;
|
||||
if (randomBoolean()) {
|
||||
inboundExceptionExcludedPath = new RuntimeException();
|
||||
} else {
|
||||
transport.incomingRequest(fakeRestRequest.getHttpRequest(), fakeRestRequest.getHttpChannel());
|
||||
inboundExceptionExcludedPath = null;
|
||||
}
|
||||
|
||||
final FakeRestRequest fakeRestRequestExcludedPath = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
|
||||
.withMethod(RestRequest.Method.OPTIONS)
|
||||
.withPath("/internal/testNotSeen")
|
||||
.withHeaders(Collections.singletonMap(Task.X_OPAQUE_ID, Collections.singletonList(opaqueId)))
|
||||
.withInboundException(inboundExceptionExcludedPath)
|
||||
.build();
|
||||
|
||||
if (randomBoolean()) {
|
||||
transport.incomingRequest(fakeRestRequestExcludedPath.getHttpRequest(), fakeRestRequestExcludedPath.getHttpChannel());
|
||||
} else {
|
||||
transport.incomingRequestError(
|
||||
fakeRestRequestExcludedPath.getHttpRequest(), fakeRestRequestExcludedPath.getHttpChannel(),
|
||||
new RuntimeException());
|
||||
}
|
||||
|
||||
transport.incomingRequest(fakeRestRequestExcludedPath.getHttpRequest(), fakeRestRequestExcludedPath.getHttpChannel());
|
||||
appender.assertAllExpectationsMatched();
|
||||
} finally {
|
||||
Loggers.removeAppender(LogManager.getLogger(traceLoggerName), appender);
|
||||
|
@ -469,6 +469,11 @@ public class DefaultRestChannelTests extends ESTestCase {
|
||||
public HttpRequest releaseAndCopy() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exception getInboundException() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestResponse implements HttpResponse {
|
||||
|
@ -560,6 +560,11 @@ public class RestControllerTests extends ESTestCase {
|
||||
public HttpRequest releaseAndCopy() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exception getInboundException() {
|
||||
return null;
|
||||
}
|
||||
}, null);
|
||||
|
||||
final AssertingChannel channel = new AssertingChannel(request, true, RestStatus.METHOD_NOT_ALLOWED);
|
||||
|
@ -54,12 +54,19 @@ public class FakeRestRequest extends RestRequest {
|
||||
private final String uri;
|
||||
private final BytesReference content;
|
||||
private final Map<String, List<String>> headers;
|
||||
private final Exception inboundException;
|
||||
|
||||
private FakeHttpRequest(Method method, String uri, BytesReference content, Map<String, List<String>> headers) {
|
||||
this(method, uri, content, headers, null);
|
||||
}
|
||||
|
||||
private FakeHttpRequest(Method method, String uri, BytesReference content, Map<String, List<String>> headers,
|
||||
Exception inboundException) {
|
||||
this.method = method;
|
||||
this.uri = uri;
|
||||
this.content = content;
|
||||
this.headers = headers;
|
||||
this.inboundException = inboundException;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -122,6 +129,11 @@ public class FakeRestRequest extends RestRequest {
|
||||
public HttpRequest releaseAndCopy() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Exception getInboundException() {
|
||||
return inboundException;
|
||||
}
|
||||
}
|
||||
|
||||
private static class FakeHttpChannel implements HttpChannel {
|
||||
@ -178,6 +190,8 @@ public class FakeRestRequest extends RestRequest {
|
||||
|
||||
private InetSocketAddress address = null;
|
||||
|
||||
private Exception inboundException;
|
||||
|
||||
public Builder(NamedXContentRegistry xContentRegistry) {
|
||||
this.xContentRegistry = xContentRegistry;
|
||||
}
|
||||
@ -215,8 +229,13 @@ public class FakeRestRequest extends RestRequest {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withInboundException(Exception exception) {
|
||||
this.inboundException = exception;
|
||||
return this;
|
||||
}
|
||||
|
||||
public FakeRestRequest build() {
|
||||
FakeHttpRequest fakeHttpRequest = new FakeHttpRequest(method, path, content, headers);
|
||||
FakeHttpRequest fakeHttpRequest = new FakeHttpRequest(method, path, content, headers, inboundException);
|
||||
return new FakeRestRequest(xContentRegistry, fakeHttpRequest, params, new FakeHttpChannel(address));
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user