Release pipelined http responses on close (#26226)

Right now it is possible for the `HttpPipeliningHandler` to queue
pipelined responses. On channel close, we do not clear and release these
responses. This commit releases the responses and completes the promise.
This commit is contained in:
Tim Brooks 2017-08-16 13:23:32 -05:00 committed by GitHub
parent 22292e8d96
commit f69cc78b67
3 changed files with 68 additions and 11 deletions

View File

@ -566,7 +566,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.getCorsConfig()));
}
if (transport.pipelining) {
ch.pipeline().addLast("pipelining", new HttpPipeliningHandler(transport.pipeliningMaxEvents));
ch.pipeline().addLast("pipelining", new HttpPipeliningHandler(transport.logger, transport.pipeliningMaxEvents));
}
ch.pipeline().addLast("handler", requestHandler);
}

View File

@ -23,8 +23,10 @@ import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.LastHttpContent;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.nio.channels.ClosedChannelException;
import java.util.Collections;
import java.util.PriorityQueue;
@ -36,6 +38,7 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
// we use a priority queue so that responses are ordered by their sequence number
private final PriorityQueue<HttpPipelinedResponse> holdingQueue;
private final Logger logger;
private final int maxEventsHeld;
/*
@ -49,10 +52,12 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
/**
* Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
*
* @param logger for logging unexpected errors
* @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is
* required as events cannot queue up indefinitely
*/
public HttpPipeliningHandler(final int maxEventsHeld) {
public HttpPipeliningHandler(Logger logger, final int maxEventsHeld) {
this.logger = logger;
this.maxEventsHeld = maxEventsHeld;
this.holdingQueue = new PriorityQueue<>(1);
}
@ -120,4 +125,20 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (holdingQueue.isEmpty() == false) {
ClosedChannelException closedChannelException = new ClosedChannelException();
HttpPipelinedResponse pipelinedResponse;
while ((pipelinedResponse = holdingQueue.poll()) != null) {
try {
pipelinedResponse.release();
pipelinedResponse.promise().setFailure(closedChannelException);
} catch (Exception e) {
logger.error("unexpected error while releasing pipelined http responses", e);
}
}
}
ctx.close(promise);
}
}

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.Randomness;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@ -61,9 +62,9 @@ import static org.hamcrest.core.Is.is;
public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
private ExecutorService executorService = Executors.newFixedThreadPool(randomIntBetween(4, 8));
private Map<String, CountDownLatch> waitingRequests = new ConcurrentHashMap<>();
private Map<String, CountDownLatch> finishingRequests = new ConcurrentHashMap<>();
private final ExecutorService executorService = Executors.newFixedThreadPool(randomIntBetween(4, 8));
private final Map<String, CountDownLatch> waitingRequests = new ConcurrentHashMap<>();
private final Map<String, CountDownLatch> finishingRequests = new ConcurrentHashMap<>();
@After
public void tearDown() throws Exception {
@ -86,7 +87,8 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
public void testThatPipeliningWorksWithFastSerializedRequests() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(numberOfRequests), new WorkEmulatorHandler());
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) {
embeddedChannel.writeInbound(createHttpRequest("/" + String.valueOf(i)));
@ -112,7 +114,8 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
public void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(numberOfRequests), new WorkEmulatorHandler());
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) {
embeddedChannel.writeInbound(createHttpRequest("/" + String.valueOf(i)));
@ -144,7 +147,7 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
final EmbeddedChannel embeddedChannel =
new EmbeddedChannel(
new AggregateUrisAndHeadersHandler(),
new HttpPipeliningHandler(numberOfRequests),
new HttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());
for (int i = 0; i < numberOfRequests; i++) {
@ -173,7 +176,8 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
public void testThatPipeliningClosesConnectionWithTooManyEvents() throws InterruptedException {
final int numberOfRequests = randomIntBetween(2, 128);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(numberOfRequests), new WorkEmulatorHandler());
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests),
new WorkEmulatorHandler());
for (int i = 0; i < 1 + numberOfRequests + 1; i++) {
embeddedChannel.writeInbound(createHttpRequest("/" + Integer.toString(i)));
@ -198,6 +202,40 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
assertFalse(embeddedChannel.isOpen());
}
public void testPipeliningRequestsAreReleased() throws InterruptedException {
final int numberOfRequests = 10;
final EmbeddedChannel embeddedChannel =
new EmbeddedChannel(new HttpPipeliningHandler(logger, numberOfRequests + 1));
for (int i = 0; i < numberOfRequests; i++) {
embeddedChannel.writeInbound(createHttpRequest("/" + i));
}
HttpPipelinedRequest inbound;
ArrayList<HttpPipelinedRequest> requests = new ArrayList<>();
while ((inbound = embeddedChannel.readInbound()) != null) {
requests.add(inbound);
}
ArrayList<ChannelPromise> promises = new ArrayList<>();
for (int i = 1; i < requests.size(); ++i) {
final DefaultFullHttpResponse httpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK);
ChannelPromise promise = embeddedChannel.newPromise();
promises.add(promise);
HttpPipelinedResponse response = requests.get(i).createHttpResponse(httpResponse, promise);
embeddedChannel.writeAndFlush(response, promise);
}
for (ChannelPromise promise : promises) {
assertFalse(promise.isDone());
}
embeddedChannel.close().syncUninterruptibly();
for (ChannelPromise promise : promises) {
assertTrue(promise.isDone());
assertTrue(promise.cause() instanceof ClosedChannelException);
}
}
private void assertReadHttpMessageHasContent(EmbeddedChannel embeddedChannel, String expectedContent) {
FullHttpResponse response = (FullHttpResponse) embeddedChannel.outboundMessages().poll();
@ -255,7 +293,5 @@ public class Netty4HttpPipeliningHandlerTests extends ESTestCase {
}
});
}
}
}