Fixes #1595 - Avoid sending unnecessary stream WINDOW_UPDATE frames.

Now the flow control strategy checks whether the stream is already
remotely closed, and if so it does not send the window update.
This commit is contained in:
Simone Bordet 2017-06-05 18:07:29 +02:00
parent 82cadedc5e
commit 28e6378b26
6 changed files with 136 additions and 54 deletions

View File

@ -37,13 +37,16 @@ import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.BufferingFlowControlStrategy;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.FlowControlStrategy;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.HTTP2Stream;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
@ -52,6 +55,7 @@ import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.server.HttpConfiguration;
@ -845,7 +849,7 @@ public abstract class FlowControlStrategyTest
{
return InvocationType.NON_BLOCKING;
}
@Override
public void succeeded()
{
@ -916,7 +920,7 @@ public abstract class FlowControlStrategyTest
{
return InvocationType.NON_BLOCKING;
}
@Override
public void succeeded()
{
@ -997,7 +1001,7 @@ public abstract class FlowControlStrategyTest
{
return InvocationType.NON_BLOCKING;
}
@Override
public void failed(Throwable x)
{
@ -1008,4 +1012,71 @@ public abstract class FlowControlStrategyTest
Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testNoWindowUpdateForRemotelyClosedStream() throws Exception
{
List<Callback> callbacks = new ArrayList<>();
start(new ServerSessionListener.Adapter()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
return new Stream.Listener.Adapter()
{
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
callbacks.add(callback);
if (frame.isEndStream())
{
// Succeed the callbacks when the stream is already remotely closed.
callbacks.forEach(Callback::succeeded);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
}
};
}
});
List<WindowUpdateFrame> sessionWindowUpdates = new ArrayList<>();
List<WindowUpdateFrame> streamWindowUpdates = new ArrayList<>();
client.setFlowControlStrategyFactory(() -> new BufferingFlowControlStrategy(0.5F)
{
@Override
public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
{
if (frame.getStreamId() == 0)
sessionWindowUpdates.add(frame);
else
streamWindowUpdates.add(frame);
super.onWindowUpdate(session, stream, frame);
}
});
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", new HttpFields());
HeadersFrame frame = new HeadersFrame(metaData, null, false);
FuturePromise<Stream> streamPromise = new FuturePromise<>();
CountDownLatch latch = new CountDownLatch(1);
session.newStream(frame, streamPromise, new Stream.Listener.Adapter()
{
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
if (frame.isEndStream())
latch.countDown();
}
});
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE - 1);
stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(sessionWindowUpdates.size() > 0);
Assert.assertEquals(0, streamWindowUpdates.size());
}
}

View File

@ -130,10 +130,10 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
Frame[] windowFrames = Frame.EMPTY_ARRAY;
if (stream != null)
{
if (stream.isClosed())
if (stream.isRemotelyClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, ignoring update stream recv window for closed {}", length, stream);
LOG.debug("Data consumed, {} bytes, ignoring update stream recv window for remotely closed {}", length, stream);
}
else
{

View File

@ -243,6 +243,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
complete();
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
private void complete()
{
notIdle();
@ -398,14 +404,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
/**
* This method is called when receiving a GO_AWAY from the other peer.
* We check the close state to act appropriately:
*
* * NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so
* that the content of the queue is written, and then the connection
* closed. We notify the application after being terminated.
* See <code>HTTP2Session.ControlEntry#succeeded()</code>
*
* * In all other cases, we do nothing since other methods are already
* performing their actions.
* <ul>
* <li>NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so
* that the content of the queue is written, and then the connection
* closed. We notify the application after being terminated.
* See <code>HTTP2Session.ControlEntry#succeeded()</code></li>
* <li>In all other cases, we do nothing since other methods are already
* performing their actions.</li>
* </ul>
*
* @param frame the GO_AWAY frame that has been received.
* @see #close(int, String, Callback)
@ -567,21 +573,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
/**
* Invoked internally and by applications to send a GO_AWAY frame to the
* other peer. We check the close state to act appropriately:
* <ul>
* <li>NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the
* GO_AWAY has been written, it will only cause the output to be shut
* down (not the connection closed), so that the application can still
* read frames arriving from the other peer.
* Ideally the other peer will notice the GO_AWAY and close the connection.
* When that happen, we close the connection from {@link #onShutdown()}.
* Otherwise, the idle timeout mechanism will close the connection, see
* {@link #onIdleTimeout()}.</li>
* <li>In all other cases, we do nothing since other methods are already
* performing their actions.</li>
* </ul>
*
* * NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the
* GO_AWAY has been written, it will only cause the output to be shut
* down (not the connection closed), so that the application can still
* read frames arriving from the other peer.
* Ideally the other peer will notice the GO_AWAY and close the connection.
* When that happen, we close the connection from {@link #onShutdown()}.
* Otherwise, the idle timeout mechanism will close the connection, see
* {@link #onIdleTimeout()}.
*
* * In all other cases, we do nothing since other methods are already
* performing their actions.
*
* @param error the error code
* @param reason the reason
* @param error the error code
* @param reason the reason
* @param callback the callback to invoke when the operation is complete
* @see #onGoAway(GoAwayFrame)
* @see #onShutdown()
@ -841,19 +847,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
* A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN.
* This method is invoked when the TCP FIN is received, or when an exception is
* thrown while reading, and we check the close state to act appropriately:
*
* * NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close)
* or there was an exception while reading, and therefore we terminate.
*
* * LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received
* it and closed the connection; we queue a disconnect to close the connection
* on the local side.
* The GO_AWAY just shutdown the output, so we need this step to make sure the
* connection is closed. See {@link #close(int, String, Callback)}.
*
* * REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we
* do nothing since the handling of the GO_AWAY will take care of closing the
* connection. See {@link #onGoAway(GoAwayFrame)}.
* <ul>
* <li>NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close)
* or there was an exception while reading, and therefore we terminate.</li>
* <li>LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received
* it and closed the connection; we queue a disconnect to close the connection
* on the local side.
* The GO_AWAY just shutdown the output, so we need this step to make sure the
* connection is closed. See {@link #close(int, String, Callback)}.</li>
* <li>REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we
* do nothing since the handling of the GO_AWAY will take care of closing the
* connection. See {@link #onGoAway(GoAwayFrame)}.</li>
* </ul>
*
* @see #onGoAway(GoAwayFrame)
* @see #close(int, String, Callback)
@ -898,18 +903,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
/**
* This method is invoked when the idle timeout triggers. We check the close state
* to act appropriately:
*
* * NOT_CLOSED: it's a real idle timeout, we just initiate a close, see
* {@link #close(int, String, Callback)}.
*
* * LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the
* other peer did not close the connection so we never received the TCP FIN, and
* therefore we terminate.
*
* * REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a
* disconnect, but for some reason it was not processed (for example, queue was
* stuck because of TCP congestion), therefore we terminate.
* See {@link #onGoAway(GoAwayFrame)}.
* <ul>
* <li>NOT_CLOSED: it's a real idle timeout, we just initiate a close, see
* {@link #close(int, String, Callback)}.</li>
* <li>LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the
* other peer did not close the connection so we never received the TCP FIN, and
* therefore we terminate.</li>
* <li>REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a
* disconnect, but for some reason it was not processed (for example, queue was
* stuck because of TCP congestion), therefore we terminate.
* See {@link #onGoAway(GoAwayFrame)}.</li>
* </ul>
*
* @return true if the session should be closed, false otherwise
* @see #onGoAway(GoAwayFrame)

View File

@ -152,6 +152,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback
return closeState.get() == CloseState.CLOSED;
}
@Override
public boolean isRemotelyClosed()
{
return closeState.get() == CloseState.REMOTELY_CLOSED;

View File

@ -105,4 +105,10 @@ public interface IStream extends Stream, Closeable
* {@link #getIdleTimeout() idle timeout} is postponed.</p>
*/
public void notIdle();
/**
* @return whether the stream is closed remotely.
* @see #isClosed()
*/
boolean isRemotelyClosed();
}

View File

@ -52,10 +52,10 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy
Frame[] streamFrame = Frame.EMPTY_ARRAY;
if (stream != null)
{
if (stream.isClosed())
if (stream.isRemotelyClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, ignoring update stream recv window by {} for closed {}", length, stream);
LOG.debug("Data consumed, ignoring update stream recv window by {} for remotely closed {}", length, stream);
}
else
{