diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index 776cb56bec8..87cb224f6d4 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -37,6 +37,8 @@ import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.FlowControlStrategy; +import org.eclipse.jetty.http2.ISession; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; @@ -47,6 +49,7 @@ import org.eclipse.jetty.server.HttpOutput; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.FuturePromise; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -221,7 +224,7 @@ public class StreamResetTest extends AbstractTest response.setStatus(200); response.setContentType("text/plain;charset=" + charset.name()); - response.setContentLength(data.length*10); + response.setContentLength(data.length * 10); response.flushBuffer(); try @@ -238,7 +241,7 @@ public class StreamResetTest extends AbstractTest { // Write some content after the stream has // been reset, it should throw an exception. - for (int i=0;i<10;i++) + for (int i = 0; i < 10; i++) { Thread.sleep(500); response.getOutputStream().write(data); @@ -350,4 +353,87 @@ public class StreamResetTest extends AbstractTest Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testClientResetConsumesQueuedData() throws Exception + { + start(new EmptyHttpServlet()); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter()); + Stream stream = promise.get(5, TimeUnit.SECONDS); + ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + CountDownLatch dataLatch = new CountDownLatch(1); + stream.data(new DataFrame(stream.getId(), data, false), new Callback() + { + @Override + public void succeeded() + { + dataLatch.countDown(); + } + }); + // The server does not read the data, so the flow control window should be zero. + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(0, ((ISession)client).updateSendWindow(0)); + + // Now reset the stream. + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); + + // Wait for the server to receive the reset and process + // it, and for the client to process the window updates. + Thread.sleep(1000); + + Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); + } + + @Test + public void testServerExceptionConsumesQueuedData() throws Exception + { + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + try + { + // Wait to let the data sent by the client to be queued. + Thread.sleep(1000); + throw new IllegalStateException(); + } + catch (InterruptedException e) + { + throw new InterruptedIOException(); + } + } + }); + + Session client = newClient(new Session.Listener.Adapter()); + MetaData.Request request = newRequest("GET", new HttpFields()); + HeadersFrame frame = new HeadersFrame(request, null, false); + FuturePromise promise = new FuturePromise<>(); + client.newStream(frame, promise, new Stream.Listener.Adapter()); + Stream stream = promise.get(5, TimeUnit.SECONDS); + ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE); + CountDownLatch dataLatch = new CountDownLatch(1); + stream.data(new DataFrame(stream.getId(), data, false), new Callback() + { + @Override + public void succeeded() + { + dataLatch.countDown(); + } + }); + // The server does not read the data, so the flow control window should be zero. + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(0, ((ISession)client).updateSendWindow(0)); + + // Wait for the server process the exception, and + // for the client to process the window updates. + Thread.sleep(2000); + + Assert.assertThat(((ISession)client).updateSendWindow(0), Matchers.greaterThan(0)); + } } diff --git a/jetty-http2/http2-client/src/test/resources/jetty-logging.properties b/jetty-http2/http2-client/src/test/resources/jetty-logging.properties index b7185f09f50..5304801a325 100644 --- a/jetty-http2/http2-client/src/test/resources/jetty-logging.properties +++ b/jetty-http2/http2-client/src/test/resources/jetty-logging.properties @@ -1,4 +1,5 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -org.eclipse.jetty.http2.hpack.LEVEL=INFO +#org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.http2.LEVEL=DEBUG +org.eclipse.jetty.http2.hpack.LEVEL=INFO #org.eclipse.jetty.servlets.LEVEL=DEBUG diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index 102734bdfeb..21d03ff0a67 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -62,7 +62,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport // copying we can defer to the endpoint return connection.getEndPoint().isOptimizedForDirectBuffers(); } - + public IStream getStream() { return stream; @@ -145,7 +145,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport if (LOG.isDebugEnabled()) LOG.debug("HTTP/2 Push {}",request); - + stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Promise() { @Override @@ -190,16 +190,20 @@ public class HttpTransportOverHTTP2 implements HttpTransport @Override public void onCompleted() { + // If the stream is not closed, it is still reading the request content. + // Send a reset to the other end so that it stops sending data. if (!stream.isClosed()) - { - // If the stream is not closed, it is still reading the request content. - // Send a reset to the other end so that it stops sending data. stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); - // Now that this stream is reset, in-flight data frames will be consumed and discarded. - // Consume the existing queued data frames to avoid stalling the flow control. - HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); - channel.getRequest().getHttpInput().consumeAll(); - } + + // Consume the existing queued data frames to + // avoid stalling the session flow control. + consumeInput(); + } + + protected void consumeInput() + { + HttpChannel channel = (HttpChannel)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); + channel.getRequest().getHttpInput().consumeAll(); } @Override @@ -213,7 +217,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport } private class CommitCallback implements Callback.NonBlocking - { + { @Override public void succeeded() {