From 30f723d2b013ae695a551f9e49e45126f9251e17 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 22 Feb 2017 20:47:00 -0500 Subject: [PATCH] Add comments to HttpPipeliningHandler This commit adds some comments explaining the design of HttpPipeliningHandler. --- .../pipelining/HttpPipeliningHandler.java | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipeliningHandler.java index 48bab56f066..1b955f0e00e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/pipelining/HttpPipeliningHandler.java @@ -1,4 +1,3 @@ - /* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with @@ -28,37 +27,40 @@ import org.elasticsearch.transport.netty4.Netty4Utils; import java.util.Collections; import java.util.PriorityQueue; -import java.util.Queue; /** - * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their - * corresponding requests. NOTE: A side effect of using this handler is that upstream HttpRequest objects will - * cause the original message event to be effectively transformed into an OrderedUpstreamMessageEvent. Conversely - * OrderedDownstreamChannelEvent objects are expected to be received for the correlating response objects. + * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests. */ public class HttpPipeliningHandler extends ChannelDuplexHandler { - private static final int INITIAL_EVENTS_HELD = 3; + private static final int INITIAL_EVENTS_HELD = 8; private final int maxEventsHeld; + /* + * The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the + * channel, and then transferred to responses. A response is not written to the channel context until its sequence number matches the + * current write sequence, implying that all preceding messages have been written. + */ private int readSequence; private int writeSequence; - private final Queue holdingQueue; + private final PriorityQueue holdingQueue; /** - * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel - * connection. This is required as events cannot queue up indefinitely; we would run out of - * memory if this was the case. + * Construct a new pipelining handler; this handler should be used downstream of HTTP decoding/aggregation. + * + * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel connection; this is + * required as events cannot queue up indefinitely */ public HttpPipeliningHandler(final int maxEventsHeld) { this.maxEventsHeld = maxEventsHeld; + // we use a priority queue so that responses are ordered by their sequence number this.holdingQueue = new PriorityQueue<>(INITIAL_EVENTS_HELD); } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { if (msg instanceof LastHttpContent) { ctx.fireChannelRead(new HttpPipelinedRequest(((LastHttpContent) msg).retain(), readSequence++)); } else { @@ -67,7 +69,7 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler { } @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception { if (msg instanceof HttpPipelinedResponse) { boolean channelShouldClose = false; @@ -76,6 +78,10 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler { holdingQueue.add((HttpPipelinedResponse) msg); while (!holdingQueue.isEmpty()) { + /* + * Since the response with the lowest sequence number is the top of the priority queue, we know if its sequence + * number does not match the current write sequence then we have not processed all preceding responses yet. + */ final HttpPipelinedResponse response = holdingQueue.peek(); if (response.sequence() != writeSequence) { break;