Modify pipelining handlers to require full requests (#31280)

Currently the http pipelining handlers seem to support chunked http
content. However, this does not make sense. There is a content
aggregator in the pipeline before the pipelining handler. This means the
pipelining handler should only see full http messages. Additionally, the
request handler immediately after the pipelining handler only supports
full messages.

This commit modifies both nio and netty4 pipelining handlers to assert
that an inbound message is a full http message. Additionally it removes
the tests for chunked content.
This commit is contained in:
Tim Brooks 2018-06-12 23:15:24 -06:00 committed by GitHub
parent 0bfd18cc8b
commit 56ffe553e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 10 additions and 84 deletions

View File

@ -22,7 +22,7 @@ package org.elasticsearch.http.netty4;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.http.HttpPipelinedRequest;
@ -53,17 +53,14 @@ public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof LastHttpContent) {
HttpPipelinedRequest<LastHttpContent> pipelinedRequest = aggregator.read(((LastHttpContent) msg));
ctx.fireChannelRead(pipelinedRequest);
} else {
ctx.fireChannelRead(msg);
}
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = aggregator.read(((FullHttpRequest) msg));
ctx.fireChannelRead(pipelinedRequest);
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
assert msg instanceof Netty4HttpResponse : "Message must be type: " + Netty4HttpResponse.class;
assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass();;
Netty4HttpResponse response = (Netty4HttpResponse) msg;
boolean success = false;
try {

View File

@ -148,38 +148,6 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
assertTrue(embeddedChannel.isOpen());
}
public void testThatPipeliningWorksWithChunkedRequests() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel =
new EmbeddedChannel(
new AggregateUrisAndHeadersHandler(),
new Netty4HttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) {
final DefaultHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/" + i);
embeddedChannel.writeInbound(request);
embeddedChannel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT);
}
final List<CountDownLatch> latches = new ArrayList<>();
for (int i = numberOfRequests - 1; i >= 0; i--) {
latches.add(finishRequest(Integer.toString(i)));
}
for (final CountDownLatch latch : latches) {
latch.await();
}
embeddedChannel.flush();
for (int i = 0; i < numberOfRequests; i++) {
assertReadHttpMessageHasContent(embeddedChannel, Integer.toString(i));
}
assertTrue(embeddedChannel.isOpen());
}
public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new Netty4HttpPipeliningHandler(logger, numberOfRequests),

View File

@ -22,13 +22,11 @@ package org.elasticsearch.http.nio;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.http.HttpPipeliningAggregator;
import org.elasticsearch.http.nio.NettyListener;
import org.elasticsearch.http.nio.NioHttpResponse;
import java.nio.channels.ClosedChannelException;
import java.util.List;
@ -55,17 +53,14 @@ public class NioHttpPipeliningHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof LastHttpContent) {
HttpPipelinedRequest<LastHttpContent> pipelinedRequest = aggregator.read(((LastHttpContent) msg));
ctx.fireChannelRead(pipelinedRequest);
} else {
ctx.fireChannelRead(msg);
}
assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass();
HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = aggregator.read(((FullHttpRequest) msg));
ctx.fireChannelRead(pipelinedRequest);
}
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
assert msg instanceof NioHttpResponse : "Message must be type: " + NioHttpResponse.class;
assert msg instanceof NioHttpResponse : "Invalid message type: " + msg.getClass();
NioHttpResponse response = (NioHttpResponse) msg;
boolean success = false;
try {

View File

@ -28,12 +28,10 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.elasticsearch.common.Randomness;
@ -147,38 +145,6 @@ public class NioHttpPipeliningHandlerTests extends ESTestCase {
assertTrue(embeddedChannel.isOpen());
}
public void testThatPipeliningWorksWithChunkedRequests() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel =
new EmbeddedChannel(
new AggregateUrisAndHeadersHandler(),
new NioHttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) {
final DefaultHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/" + i);
embeddedChannel.writeInbound(request);
embeddedChannel.writeInbound(LastHttpContent.EMPTY_LAST_CONTENT);
}
final List<CountDownLatch> latches = new ArrayList<>();
for (int i = numberOfRequests - 1; i >= 0; i--) {
latches.add(finishRequest(Integer.toString(i)));
}
for (final CountDownLatch latch : latches) {
latch.await();
}
embeddedChannel.flush();
for (int i = 0; i < numberOfRequests; i++) {
assertReadHttpMessageHasContent(embeddedChannel, Integer.toString(i));
}
assertTrue(embeddedChannel.isOpen());
}
public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new NioHttpPipeliningHandler(logger, numberOfRequests),