diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java index bea64c1946d..77d9d9c32f0 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamResetTest.java @@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.servlet.AsyncContext; -import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; import javax.servlet.http.HttpServlet; @@ -244,12 +243,13 @@ public class StreamResetTest extends AbstractTest @Test public void testBlockingWriteAfterStreamReceivingReset() throws Exception { - final CountDownLatch resetLatch = new CountDownLatch(1); - final CountDownLatch dataLatch = new CountDownLatch(1); + CountDownLatch commitLatch = new CountDownLatch(1); + CountDownLatch resetLatch = new CountDownLatch(1); + CountDownLatch dataLatch = new CountDownLatch(1); start(new HttpServlet() { @Override - protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException { Charset charset = StandardCharsets.UTF_8; byte[] data = "AFTER RESET".getBytes(charset); @@ -258,11 +258,15 @@ public class StreamResetTest extends AbstractTest response.setContentType("text/plain;charset=" + charset.name()); response.setContentLength(data.length * 10); response.flushBuffer(); + // Wait for the commit callback to complete. + commitLatch.countDown(); try { - // Wait for the reset to happen. - Assert.assertTrue(resetLatch.await(10, TimeUnit.SECONDS)); + // Wait for the reset to be sent. + Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + // Wait for the reset to arrive to the server and be processed. + Thread.sleep(1000); } catch (InterruptedException x) { @@ -282,7 +286,7 @@ public class StreamResetTest extends AbstractTest } catch (InterruptedException x) { - + throw new InterruptedIOException(); } catch (IOException x) { @@ -299,23 +303,33 @@ public class StreamResetTest extends AbstractTest @Override public void onHeaders(Stream stream, HeadersFrame frame) { - stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); - resetLatch.countDown(); + try + { + commitLatch.await(5, TimeUnit.SECONDS); + Callback.Completable completable = new Callback.Completable(); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), completable); + completable.thenRun(resetLatch::countDown); + } + catch (InterruptedException x) + { + x.printStackTrace(); + } } }); - Assert.assertTrue(dataLatch.await(10, TimeUnit.SECONDS)); + Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); } @Test public void testAsyncWriteAfterStreamReceivingReset() throws Exception { - final CountDownLatch resetLatch = new CountDownLatch(1); - final CountDownLatch dataLatch = new CountDownLatch(1); + CountDownLatch commitLatch = new CountDownLatch(1); + CountDownLatch resetLatch = new CountDownLatch(1); + CountDownLatch dataLatch = new CountDownLatch(1); start(new HttpServlet() { @Override - protected void doGet(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException + protected void doGet(HttpServletRequest request, final HttpServletResponse response) throws IOException { Charset charset = StandardCharsets.UTF_8; final ByteBuffer data = ByteBuffer.wrap("AFTER RESET".getBytes(charset)); @@ -324,6 +338,8 @@ public class StreamResetTest extends AbstractTest response.setContentType("text/plain;charset=" + charset.name()); response.setContentLength(data.remaining()); response.flushBuffer(); + // Wait for the commit callback to complete. + commitLatch.countDown(); try { @@ -339,34 +355,30 @@ public class StreamResetTest extends AbstractTest // Write some content asynchronously after the stream has been reset. final AsyncContext context = request.startAsync(); - new Thread() + new Thread(() -> { - @Override - public void run() + try { - try - { - // Wait for the request thread to exit - // doGet() so this is really asynchronous. - Thread.sleep(1000); + // Wait for the request thread to exit + // doGet() so this is really asynchronous. + Thread.sleep(1000); - HttpOutput output = (HttpOutput)response.getOutputStream(); - output.sendContent(data, new Callback() - { - @Override - public void failed(Throwable x) - { - context.complete(); - dataLatch.countDown(); - } - }); - } - catch (Throwable x) + HttpOutput output = (HttpOutput)response.getOutputStream(); + output.sendContent(data, new Callback() { - x.printStackTrace(); - } + @Override + public void failed(Throwable x) + { + context.complete(); + dataLatch.countDown(); + } + }); } - }.start(); + catch (Throwable x) + { + x.printStackTrace(); + } + }).start(); } }); @@ -378,8 +390,17 @@ public class StreamResetTest extends AbstractTest @Override public void onHeaders(Stream stream, HeadersFrame frame) { - stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); - resetLatch.countDown(); + try + { + commitLatch.await(5, TimeUnit.SECONDS); + Callback.Completable completable = new Callback.Completable(); + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), completable); + completable.thenRun(resetLatch::countDown); + } + catch (InterruptedException x) + { + x.printStackTrace(); + } } }); @@ -439,7 +460,7 @@ public class StreamResetTest extends AbstractTest context.addServlet(new ServletHolder(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { phaser.get().countDown(); IO.copy(request.getInputStream(), response.getOutputStream()); @@ -526,7 +547,7 @@ public class StreamResetTest extends AbstractTest start(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { try { @@ -578,7 +599,7 @@ public class StreamResetTest extends AbstractTest start(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) { AsyncContext asyncContext = request.startAsync(); asyncContext.start(() -> @@ -642,7 +663,7 @@ public class StreamResetTest extends AbstractTest start(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) { try { @@ -694,7 +715,7 @@ public class StreamResetTest extends AbstractTest start(new HttpServlet() { @Override - protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException { AsyncContext asyncContext = request.startAsync(); ServletOutputStream output = response.getOutputStream(); diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index d862891c8f8..ff5043193ae 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -303,9 +303,10 @@ public class HttpTransportOverHTTP2 implements HttpTransport commit = this.commit; if (state == State.WRITING) { + this.state = State.IDLE; callback = this.callback; this.callback = null; - this.state = State.IDLE; + this.commit = false; } } if (LOG.isDebugEnabled()) @@ -330,13 +331,13 @@ public class HttpTransportOverHTTP2 implements HttpTransport if (state == State.WRITING) { this.state = State.FAILED; - this.failure = failure; callback = this.callback; this.callback = null; + this.failure = failure; } } if (LOG.isDebugEnabled()) - LOG.debug(String.format("HTTP2 Response #%d/%h failed to %s", stream.getId(), stream.getSession(), commit ? "commit" : "flush"), failure); + LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(), commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure); if (callback != null) callback.failed(failure); }