Add comments to HttpPipeliningHandler

This commit adds some comments explaining the design of
HttpPipeliningHandler.
This commit is contained in:
Jason Tedor 2017-02-22 20:47:00 -05:00
parent 6c9b89b882
commit 30f723d2b0
1 changed files with 19 additions and 13 deletions

View File

@ -1,4 +1,3 @@
/* /*
* Licensed to Elasticsearch under one or more contributor * Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with * license agreements. See the NOTICE file distributed with
@ -28,37 +27,40 @@ import org.elasticsearch.transport.netty4.Netty4Utils;
import java.util.Collections; import java.util.Collections;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Queue;
/** /**
* Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests.
* corresponding requests. NOTE: A side effect of using this handler is that upstream HttpRequest objects will
* cause the original message event to be effectively transformed into an OrderedUpstreamMessageEvent. Conversely
* OrderedDownstreamChannelEvent objects are expected to be received for the correlating response objects.
*/ */
public class HttpPipeliningHandler extends ChannelDuplexHandler { public class HttpPipeliningHandler extends ChannelDuplexHandler {
private static final int INITIAL_EVENTS_HELD = 3; private static final int INITIAL_EVENTS_HELD = 8;
private final int maxEventsHeld; private final int maxEventsHeld;
/*
* The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
* channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the
* current write sequence, implying that all preceding messages have been written.
*/
private int readSequence; private int readSequence;
private int writeSequence; private int writeSequence;
private final Queue<HttpPipelinedResponse> holdingQueue; private final PriorityQueue<HttpPipelinedResponse> holdingQueue;
/** /**
* @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel * Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation.
* connection. This is required as events cannot queue up indefinitely; we would run out of *
* memory if this was the case. * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is
* required as events cannot queue up indefinitely
*/ */
public HttpPipeliningHandler(final int maxEventsHeld) { public HttpPipeliningHandler(final int maxEventsHeld) {
this.maxEventsHeld = maxEventsHeld; this.maxEventsHeld = maxEventsHeld;
// we use a priority queue so that responses are ordered by their sequence number
this.holdingQueue = new PriorityQueue<>(INITIAL_EVENTS_HELD); this.holdingQueue = new PriorityQueue<>(INITIAL_EVENTS_HELD);
} }
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
if (msg instanceof LastHttpContent) { if (msg instanceof LastHttpContent) {
ctx.fireChannelRead(new HttpPipelinedRequest(((LastHttpContent) msg).retain(), readSequence++)); ctx.fireChannelRead(new HttpPipelinedRequest(((LastHttpContent) msg).retain(), readSequence++));
} else { } else {
@ -67,7 +69,7 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
} }
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception {
if (msg instanceof HttpPipelinedResponse) { if (msg instanceof HttpPipelinedResponse) {
boolean channelShouldClose = false; boolean channelShouldClose = false;
@ -76,6 +78,10 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
holdingQueue.add((HttpPipelinedResponse) msg); holdingQueue.add((HttpPipelinedResponse) msg);
while (!holdingQueue.isEmpty()) { while (!holdingQueue.isEmpty()) {
/*
* Since the response with the lowest sequence number is the top of the priority queue, we know if its sequence
* number does not match the current write sequence then we have not processed all preceding responses yet.
*/
final HttpPipelinedResponse response = holdingQueue.peek(); final HttpPipelinedResponse response = holdingQueue.peek();
if (response.sequence() != writeSequence) { if (response.sequence() != writeSequence) {
break; break;