diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java index 247e9ff4a45..4522965c33e 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java @@ -37,13 +37,16 @@ import org.eclipse.jetty.http.HostPortHttpField; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpScheme; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.BufferingFlowControlStrategy; import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.HTTP2Stream; import org.eclipse.jetty.http2.ISession; +import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; @@ -52,6 +55,7 @@ import org.eclipse.jetty.http2.frames.GoAwayFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.server.HttpConfiguration; @@ -845,7 +849,7 @@ public abstract class FlowControlStrategyTest { return InvocationType.NON_BLOCKING; } - + @Override public void succeeded() { @@ -916,7 +920,7 @@ public abstract class FlowControlStrategyTest { return InvocationType.NON_BLOCKING; } - + @Override public void succeeded() { @@ -997,7 +1001,7 @@ public abstract class FlowControlStrategyTest { return InvocationType.NON_BLOCKING; } - + @Override public void failed(Throwable x) { @@ -1008,4 +1012,71 @@ public abstract class FlowControlStrategyTest Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testNoWindowUpdateForRemotelyClosedStream() throws Exception + { + List callbacks = new ArrayList<>(); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + callbacks.add(callback); + if (frame.isEndStream()) + { + // Succeed the callbacks when the stream is already remotely closed. + callbacks.forEach(Callback::succeeded); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + } + } + }; + } + }); + + List sessionWindowUpdates = new ArrayList<>(); + List streamWindowUpdates = new ArrayList<>(); + client.setFlowControlStrategyFactory(() -> new BufferingFlowControlStrategy(0.5F) + { + @Override + public void onWindowUpdate(ISession session, IStream stream, WindowUpdateFrame frame) + { + if (frame.getStreamId() == 0) + sessionWindowUpdates.add(frame); + else + streamWindowUpdates.add(frame); + super.onWindowUpdate(session, stream, frame); + } + }); + + Session session = newClient(new Session.Listener.Adapter()); + MetaData.Request metaData = newRequest("POST", new HttpFields()); + HeadersFrame frame = new HeadersFrame(metaData, null, false); + FuturePromise streamPromise = new FuturePromise<>(); + CountDownLatch latch = new CountDownLatch(1); + session.newStream(frame, streamPromise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + latch.countDown(); + } + }); + Stream stream = streamPromise.get(5, TimeUnit.SECONDS); + + ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE - 1); + stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP); + + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + + Assert.assertTrue(sessionWindowUpdates.size() > 0); + Assert.assertEquals(0, streamWindowUpdates.size()); + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java index 85db949aa09..92bbc878a44 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/BufferingFlowControlStrategy.java @@ -130,10 +130,10 @@ public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy Frame[] windowFrames = Frame.EMPTY_ARRAY; if (stream != null) { - if (stream.isClosed()) + if (stream.isRemotelyClosed()) { if (LOG.isDebugEnabled()) - LOG.debug("Data consumed, {} bytes, ignoring update stream recv window for closed {}", length, stream); + LOG.debug("Data consumed, {} bytes, ignoring update stream recv window for remotely closed {}", length, stream); } else { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index dcff424d72e..7859c238946 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -243,6 +243,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio complete(); } + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + private void complete() { notIdle(); @@ -398,14 +404,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio /** * This method is called when receiving a GO_AWAY from the other peer. * We check the close state to act appropriately: - * - * * NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so - * that the content of the queue is written, and then the connection - * closed. We notify the application after being terminated. - * See HTTP2Session.ControlEntry#succeeded() - * - * * In all other cases, we do nothing since other methods are already - * performing their actions. + *
    + *
  • NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so + * that the content of the queue is written, and then the connection + * closed. We notify the application after being terminated. + * See HTTP2Session.ControlEntry#succeeded()
  • + *
  • In all other cases, we do nothing since other methods are already + * performing their actions.
  • + *
* * @param frame the GO_AWAY frame that has been received. * @see #close(int, String, Callback) @@ -567,21 +573,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio /** * Invoked internally and by applications to send a GO_AWAY frame to the * other peer. We check the close state to act appropriately: + *
    + *
  • NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the + * GO_AWAY has been written, it will only cause the output to be shut + * down (not the connection closed), so that the application can still + * read frames arriving from the other peer. + * Ideally the other peer will notice the GO_AWAY and close the connection. + * When that happen, we close the connection from {@link #onShutdown()}. + * Otherwise, the idle timeout mechanism will close the connection, see + * {@link #onIdleTimeout()}.
  • + *
  • In all other cases, we do nothing since other methods are already + * performing their actions.
  • + *
* - * * NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the - * GO_AWAY has been written, it will only cause the output to be shut - * down (not the connection closed), so that the application can still - * read frames arriving from the other peer. - * Ideally the other peer will notice the GO_AWAY and close the connection. - * When that happen, we close the connection from {@link #onShutdown()}. - * Otherwise, the idle timeout mechanism will close the connection, see - * {@link #onIdleTimeout()}. - * - * * In all other cases, we do nothing since other methods are already - * performing their actions. - * - * @param error the error code - * @param reason the reason + * @param error the error code + * @param reason the reason * @param callback the callback to invoke when the operation is complete * @see #onGoAway(GoAwayFrame) * @see #onShutdown() @@ -841,19 +847,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio * A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN. * This method is invoked when the TCP FIN is received, or when an exception is * thrown while reading, and we check the close state to act appropriately: - * - * * NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close) - * or there was an exception while reading, and therefore we terminate. - * - * * LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received - * it and closed the connection; we queue a disconnect to close the connection - * on the local side. - * The GO_AWAY just shutdown the output, so we need this step to make sure the - * connection is closed. See {@link #close(int, String, Callback)}. - * - * * REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we - * do nothing since the handling of the GO_AWAY will take care of closing the - * connection. See {@link #onGoAway(GoAwayFrame)}. + *
    + *
  • NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close) + * or there was an exception while reading, and therefore we terminate.
  • + *
  • LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received + * it and closed the connection; we queue a disconnect to close the connection + * on the local side. + * The GO_AWAY just shutdown the output, so we need this step to make sure the + * connection is closed. See {@link #close(int, String, Callback)}.
  • + *
  • REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we + * do nothing since the handling of the GO_AWAY will take care of closing the + * connection. See {@link #onGoAway(GoAwayFrame)}.
  • + *
* * @see #onGoAway(GoAwayFrame) * @see #close(int, String, Callback) @@ -898,18 +903,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio /** * This method is invoked when the idle timeout triggers. We check the close state * to act appropriately: - * - * * NOT_CLOSED: it's a real idle timeout, we just initiate a close, see - * {@link #close(int, String, Callback)}. - * - * * LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the - * other peer did not close the connection so we never received the TCP FIN, and - * therefore we terminate. - * - * * REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a - * disconnect, but for some reason it was not processed (for example, queue was - * stuck because of TCP congestion), therefore we terminate. - * See {@link #onGoAway(GoAwayFrame)}. + *
    + *
  • NOT_CLOSED: it's a real idle timeout, we just initiate a close, see + * {@link #close(int, String, Callback)}.
  • + *
  • LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the + * other peer did not close the connection so we never received the TCP FIN, and + * therefore we terminate.
  • + *
  • REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a + * disconnect, but for some reason it was not processed (for example, queue was + * stuck because of TCP congestion), therefore we terminate. + * See {@link #onGoAway(GoAwayFrame)}.
  • + *
* * @return true if the session should be closed, false otherwise * @see #onGoAway(GoAwayFrame) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 6572f312195..7ffdc476d39 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -152,6 +152,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback return closeState.get() == CloseState.CLOSED; } + @Override public boolean isRemotelyClosed() { return closeState.get() == CloseState.REMOTELY_CLOSED; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index 6137dfe8625..e69195082a8 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -105,4 +105,10 @@ public interface IStream extends Stream, Closeable * {@link #getIdleTimeout() idle timeout} is postponed.

*/ public void notIdle(); + + /** + * @return whether the stream is closed remotely. + * @see #isClosed() + */ + boolean isRemotelyClosed(); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java index 92c0defde38..04774d660a5 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/SimpleFlowControlStrategy.java @@ -52,10 +52,10 @@ public class SimpleFlowControlStrategy extends AbstractFlowControlStrategy Frame[] streamFrame = Frame.EMPTY_ARRAY; if (stream != null) { - if (stream.isClosed()) + if (stream.isRemotelyClosed()) { if (LOG.isDebugEnabled()) - LOG.debug("Data consumed, ignoring update stream recv window by {} for closed {}", length, stream); + LOG.debug("Data consumed, ignoring update stream recv window by {} for remotely closed {}", length, stream); } else {