diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java index a2c2345d6e2..b7018a11c35 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java @@ -18,9 +18,11 @@ package org.eclipse.jetty.http2.client; +import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.http.HttpFields; @@ -30,7 +32,11 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.CloseState; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.ISession; +import org.eclipse.jetty.http2.IStream; +import org.eclipse.jetty.http2.SimpleFlowControlStrategy; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.api.server.ServerSessionListener; @@ -173,7 +179,7 @@ public class GoAwayTest extends AbstractTest }); Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); -// Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); @@ -354,9 +360,125 @@ public class GoAwayTest extends AbstractTest // The server should have sent the GOAWAY after the last stream completed. Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testServerGoAwayWithStalledStreamServerConsumesDataOfInFlightStream() throws Exception + { + int flowControlWindow = 32 * 1024; + + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + AtomicInteger dataFrames = new AtomicInteger(); + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + // Do not consume the data for this stream (i.e. don't succeed the callback). + // Only send the response when receiving the first DATA frame. + if (dataFrames.incrementAndGet() == 1) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + } + } + }; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }, h2 -> + { + // Use the simple, predictable, strategy for window updates. + h2.setFlowControlStrategyFactory(SimpleFlowControlStrategy::new); + h2.setInitialSessionRecvWindow(flowControlWindow); + h2.setInitialStreamRecvWindow(flowControlWindow); + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + // This is necessary because the server session window is smaller than the + // default and the server cannot send a WINDOW_UPDATE with a negative value. + ((ISession)clientSession).updateSendWindow(flowControlWindow - FlowControlStrategy.DEFAULT_WINDOW_SIZE); + + MetaData.Request request1 = newRequest("GET", new HttpFields()); + HeadersFrame headersFrame1 = new HeadersFrame(request1, null, false); + DataFrame dataFrame1 = new DataFrame(ByteBuffer.allocate(flowControlWindow / 2), false); + ((ISession)clientSession).newStream(new IStream.FrameList(headersFrame1, dataFrame1, null), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream clientStream1, HeadersFrame frame) + { + // Send the server GOAWAY frame. + serverSessionRef.get().close(ErrorCode.NO_ERROR.code, null, Callback.NOOP); + + // Send a second, in-flight, stream with data, which + // will exhaust the client session flow control window. + // The server should consume the data even if it will drop + // this stream, so that the first stream can send more data. + MetaData.Request request2 = newRequest("POST", new HttpFields()); + HeadersFrame headersFrame2 = new HeadersFrame(request2, null, false); + DataFrame dataFrame2 = new DataFrame(ByteBuffer.allocate(flowControlWindow / 2), true); + ((ISession)clientStream1.getSession()).newStream(new IStream.FrameList(headersFrame2, dataFrame2, null), new Promise.Adapter() + { + @Override + public void succeeded(Stream clientStream2) + { + // After the in-flight stream is sent, try to complete the first stream. + // The client should receive the window update from + // the server and be able to complete this stream. + clientStream1.data(new DataFrame(clientStream1.getId(), ByteBuffer.allocate(flowControlWindow / 2), true), Callback.NOOP); + } + }, new Adapter()); + } + }); + + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); @@ -643,6 +765,7 @@ public class GoAwayTest extends AbstractTest }); Session clientSession = newClient(new Session.Listener.Adapter()); + // TODO: get rid of sleep! // Wait for the SETTINGS frames to be exchanged. Thread.sleep(500); @@ -691,6 +814,8 @@ public class GoAwayTest extends AbstractTest Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); } + // TODO: add a shutdown test with pending stream. + @Test public void testServerIdleTimeout() throws Exception { @@ -729,7 +854,8 @@ public class GoAwayTest extends AbstractTest @Override public void onGoAway(Session session, GoAwayFrame frame) { - clientGoAwayLatch.countDown(); + if (!frame.isGraceful()) + clientGoAwayLatch.countDown(); } @Override @@ -802,13 +928,22 @@ public class GoAwayTest extends AbstractTest clientCloseLatch.countDown(); } }); + CountDownLatch clientResetLatch = new CountDownLatch(1); MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); // Send request headers but not data. - clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter()); + clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + clientResetLatch.countDown(); + } + }); Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); // Server idle timeout sends a non-graceful GOAWAY. - Assertions.assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + Assertions.assertTrue(clientResetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); @@ -871,14 +1006,13 @@ public class GoAwayTest extends AbstractTest } }); MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); - CountDownLatch streamFailureLatch = new CountDownLatch(1); + CountDownLatch streamResetLatch = new CountDownLatch(1); clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter() { @Override - public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) + public void onReset(Stream stream, ResetFrame frame) { - streamFailureLatch.countDown(); - callback.succeeded(); + streamResetLatch.countDown(); } }); @@ -886,8 +1020,8 @@ public class GoAwayTest extends AbstractTest ((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); Assertions.assertTrue(serverGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(streamResetLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); - Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index ecee9d19988..43f3ce97925 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -757,6 +757,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio protected IStream createRemoteStream(int streamId) { + // This stream has been seen the server. + // Even if the stream cannot be created because this peer is closing, + // updating the lastRemoteStreamId ensures that in-flight HEADERS and + // DATA frames can be read (and discarded) without causing an error. + updateLastRemoteStreamId(streamId); + if (!streamsState.newRemoteStream(streamId)) { if (LOG.isDebugEnabled()) @@ -764,9 +770,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio return null; } - // This stream has been seen the server. - updateLastRemoteStreamId(streamId); - // SPEC: exceeding max concurrent streams is treated as stream error. while (true) { @@ -1423,8 +1426,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private long idleTime = System.nanoTime(); private CloseState closed = CloseState.NOT_CLOSED; private Runnable closingAction; - private GoAwayFrame goAwayRecv; - private GoAwayFrame goAwaySent; + private volatile GoAwayFrame goAwayRecv; + private volatile GoAwayFrame goAwaySent; private volatile Throwable failure; private Thread flushing; @@ -1739,7 +1742,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { String reason = "idle_timeout"; boolean notify = false; - Throwable cause = null; + boolean sendGoAway = false; try (Locker.Lock l = lock.lock()) { switch (closed) @@ -1752,22 +1755,24 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio notify = true; break; } - // Timed out while waiting for closing events, abort all the streams. + // Timed out while waiting for closing events, fail all the streams. case LOCALLY_CLOSED: { - boolean shouldSend = goAwaySent.isGraceful(); - goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason); + if (goAwaySent.isGraceful()) + { + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason); + sendGoAway = true; + } closed = CloseState.CLOSING; - closingAction = shouldSend ? () -> sendGoAwayAndTerminate(goAwaySent) : () -> terminate(goAwaySent); - failure = cause = new TimeoutException("Session idle timeout expired"); + failure = new TimeoutException("Session idle timeout expired"); break; } case REMOTELY_CLOSED: { goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason); closed = CloseState.CLOSING; - closingAction = () -> sendGoAwayAndTerminate(goAwaySent); - failure = cause = new TimeoutException("Session idle timeout expired"); + failure = new TimeoutException("Session idle timeout expired"); + sendGoAway = true; break; } default: @@ -1789,7 +1794,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio return false; } - abort(reason, cause, Callback.from(this::tryRunClosingAction)); + failStreams(stream -> true, reason, true); + if (sendGoAway) + sendGoAway(goAwaySent, Callback.NOOP); + notifyFailure(HTTP2Session.this, failure, Callback.NOOP); + terminate(goAwaySent); return false; }