diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index dec64e09cd0..b4509b8cca6 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -94,20 +94,6 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory this.listener = listener; } - @Override - public long getMessagesIn() - { - HTTP2ClientSession session = (HTTP2ClientSession)getSession(); - return session.getStreamsOpened(); - } - - @Override - public long getMessagesOut() - { - HTTP2ClientSession session = (HTTP2ClientSession)getSession(); - return session.getStreamsClosed(); - } - @Override public void onOpen() { @@ -127,15 +113,11 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory ISession session = getSession(); int windowDelta = client.getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE; + session.updateRecvWindow(windowDelta); if (windowDelta > 0) - { - session.updateRecvWindow(windowDelta); session.frames(null, List.of(prefaceFrame, settingsFrame, new WindowUpdateFrame(0, windowDelta)), this); - } else - { session.frames(null, List.of(prefaceFrame, settingsFrame), this); - } } @Override diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java index 2494c6df798..de325aa5ec7 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientSession.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.http2.client; import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.CloseState; import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.HTTP2Session; @@ -62,7 +63,10 @@ public class HTTP2ClientSession extends HTTP2Session else { stream.process(frame, Callback.NOOP); + boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED); notifyHeaders(stream, frame); + if (closed) + removeStream(stream); } } else diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java index c1a8856507a..875b1828217 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java @@ -134,7 +134,7 @@ public class AsyncServletTest extends AbstractTest HeadersFrame frame = new HeadersFrame(metaData, null, true); FuturePromise promise = new FuturePromise<>(); CountDownLatch responseLatch = new CountDownLatch(1); - CountDownLatch resetLatch = new CountDownLatch(1); + CountDownLatch failLatch = new CountDownLatch(1); session.newStream(frame, promise, new Stream.Listener.Adapter() { @Override @@ -144,9 +144,10 @@ public class AsyncServletTest extends AbstractTest } @Override - public void onReset(Stream stream, ResetFrame frame) + public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) { - resetLatch.countDown(); + failLatch.countDown(); + callback.succeeded(); } }); Stream stream = promise.get(5, TimeUnit.SECONDS); @@ -154,7 +155,7 @@ public class AsyncServletTest extends AbstractTest assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); assertFalse(responseLatch.await(idleTimeout + 1000, TimeUnit.MILLISECONDS)); - assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + assertTrue(failLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); } @Test diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java index 4c1867008e8..f534a4fac54 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java @@ -137,10 +137,10 @@ public abstract class FlowControlStrategyTest @Test public void testWindowSizeUpdates() throws Exception { - final CountDownLatch prefaceLatch = new CountDownLatch(1); - final CountDownLatch stream1Latch = new CountDownLatch(1); - final CountDownLatch stream2Latch = new CountDownLatch(1); - final CountDownLatch settingsLatch = new CountDownLatch(1); + CountDownLatch prefaceLatch = new CountDownLatch(1); + CountDownLatch stream1Latch = new CountDownLatch(1); + CountDownLatch stream2Latch = new CountDownLatch(1); + CountDownLatch settingsLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() { @Override @@ -233,11 +233,11 @@ public abstract class FlowControlStrategyTest // then we change the window to 512 B. At this point, the client // must stop sending data (although the initial window allows it). - final int size = 512; + int size = 512; // We get 3 data frames: the first of 1024 and 2 of 512 each // after the flow control window has been reduced. - final CountDownLatch dataLatch = new CountDownLatch(3); - final AtomicReference callbackRef = new AtomicReference<>(); + CountDownLatch dataLatch = new CountDownLatch(3); + AtomicReference callbackRef = new AtomicReference<>(); start(new ServerSessionListener.Adapter() { @Override @@ -275,7 +275,7 @@ public abstract class FlowControlStrategyTest }); // Two SETTINGS frames, the initial one and the one we send from the server. - final CountDownLatch settingsLatch = new CountDownLatch(2); + CountDownLatch settingsLatch = new CountDownLatch(2); Session session = newClient(new Session.Listener.Adapter() { @Override @@ -312,9 +312,9 @@ public abstract class FlowControlStrategyTest @Test public void testServerFlowControlOneBigWrite() throws Exception { - final int windowSize = 1536; - final int length = 5 * windowSize; - final CountDownLatch settingsLatch = new CountDownLatch(2); + int windowSize = 1536; + int length = 5 * windowSize; + CountDownLatch settingsLatch = new CountDownLatch(2); start(new ServerSessionListener.Adapter() { @Override @@ -349,13 +349,13 @@ public abstract class FlowControlStrategyTest assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); - final CountDownLatch dataLatch = new CountDownLatch(1); - final Exchanger exchanger = new Exchanger<>(); + CountDownLatch dataLatch = new CountDownLatch(1); + Exchanger exchanger = new Exchanger<>(); MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY); HeadersFrame requestFrame = new HeadersFrame(metaData, null, true); session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter() { - private AtomicInteger dataFrames = new AtomicInteger(); + private final AtomicInteger dataFrames = new AtomicInteger(); @Override public void onData(Stream stream, DataFrame frame, Callback callback) @@ -406,10 +406,10 @@ public abstract class FlowControlStrategyTest @Test public void testClientFlowControlOneBigWrite() throws Exception { - final int windowSize = 1536; - final Exchanger exchanger = new Exchanger<>(); - final CountDownLatch settingsLatch = new CountDownLatch(1); - final CountDownLatch dataLatch = new CountDownLatch(1); + int windowSize = 1536; + Exchanger exchanger = new Exchanger<>(); + CountDownLatch settingsLatch = new CountDownLatch(1); + CountDownLatch dataLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() { @Override @@ -428,7 +428,7 @@ public abstract class FlowControlStrategyTest stream.headers(responseFrame, Callback.NOOP); return new Stream.Listener.Adapter() { - private AtomicInteger dataFrames = new AtomicInteger(); + private final AtomicInteger dataFrames = new AtomicInteger(); @Override public void onData(Stream stream, DataFrame frame, Callback callback) @@ -480,7 +480,7 @@ public abstract class FlowControlStrategyTest session.newStream(requestFrame, streamPromise, null); Stream stream = streamPromise.get(5, TimeUnit.SECONDS); - final int length = 5 * windowSize; + int length = 5 * windowSize; DataFrame dataFrame = new DataFrame(stream.getId(), ByteBuffer.allocate(length), true); stream.data(dataFrame, Callback.NOOP); @@ -499,7 +499,7 @@ public abstract class FlowControlStrategyTest assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); } - private void checkThatWeAreFlowControlStalled(Exchanger exchanger) throws Exception + private void checkThatWeAreFlowControlStalled(Exchanger exchanger) { assertThrows(TimeoutException.class, () -> exchanger.exchange(null, 1, TimeUnit.SECONDS)); @@ -508,7 +508,7 @@ public abstract class FlowControlStrategyTest @Test public void testSessionStalledStallsNewStreams() throws Exception { - final int windowSize = 1024; + int windowSize = 1024; start(new ServerSessionListener.Adapter() { @Override @@ -543,8 +543,8 @@ public abstract class FlowControlStrategyTest Session session = newClient(new Session.Listener.Adapter()); // First request is just to consume most of the session window. - final List callbacks1 = new ArrayList<>(); - final CountDownLatch prepareLatch = new CountDownLatch(1); + List callbacks1 = new ArrayList<>(); + CountDownLatch prepareLatch = new CountDownLatch(1); MetaData.Request request1 = newRequest("POST", HttpFields.EMPTY); session.newStream(new HeadersFrame(request1, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() { @@ -583,7 +583,7 @@ public abstract class FlowControlStrategyTest }); // Fourth request is now stalled. - final CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch = new CountDownLatch(1); MetaData.Request request4 = newRequest("GET", HttpFields.EMPTY); session.newStream(new HeadersFrame(request4, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() { @@ -612,7 +612,7 @@ public abstract class FlowControlStrategyTest @Test public void testServerSendsBigContent() throws Exception { - final byte[] data = new byte[1024 * 1024]; + byte[] data = new byte[1024 * 1024]; new Random().nextBytes(data); start(new ServerSessionListener.Adapter() @@ -636,8 +636,8 @@ public abstract class FlowControlStrategyTest Session session = newClient(new Session.Listener.Adapter()); MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY); HeadersFrame requestFrame = new HeadersFrame(metaData, null, true); - final byte[] bytes = new byte[data.length]; - final CountDownLatch latch = new CountDownLatch(1); + byte[] bytes = new byte[data.length]; + CountDownLatch latch = new CountDownLatch(1); session.newStream(requestFrame, new Promise.Adapter<>(), new Stream.Listener.Adapter() { private int received; @@ -681,7 +681,7 @@ public abstract class FlowControlStrategyTest } }); - final int initialWindow = 16; + int initialWindow = 16; Session session = newClient(new Session.Listener.Adapter() { @Override @@ -697,11 +697,11 @@ public abstract class FlowControlStrategyTest new Random().nextBytes(requestData); byte[] responseData = new byte[requestData.length]; - final ByteBuffer responseContent = ByteBuffer.wrap(responseData); + ByteBuffer responseContent = ByteBuffer.wrap(responseData); MetaData.Request metaData = newRequest("GET", HttpFields.EMPTY); HeadersFrame requestFrame = new HeadersFrame(metaData, null, false); Promise.Completable completable = new Promise.Completable<>(); - final CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch = new CountDownLatch(1); session.newStream(requestFrame, completable, new Stream.Listener.Adapter() { @Override @@ -730,6 +730,7 @@ public abstract class FlowControlStrategyTest public void testClientExceedingSessionWindow() throws Exception { // On server, we don't consume the data. + CountDownLatch serverCloseLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() { @Override @@ -744,16 +745,29 @@ public abstract class FlowControlStrategyTest } }; } - }); - final CountDownLatch closeLatch = new CountDownLatch(1); - Session session = newClient(new Session.Listener.Adapter() - { @Override public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session session = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) { if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code) - closeLatch.countDown(); + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); } }); @@ -764,7 +778,7 @@ public abstract class FlowControlStrategyTest session.newStream(requestFrame, Promise.from(completable), new Stream.Listener.Adapter()); Stream stream = completable.get(5, TimeUnit.SECONDS); ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE); - final CountDownLatch dataLatch = new CountDownLatch(1); + CountDownLatch dataLatch = new CountDownLatch(1); stream.data(new DataFrame(stream.getId(), data, false), new Callback() { @Override @@ -796,16 +810,19 @@ public abstract class FlowControlStrategyTest ByteBuffer extraData = ByteBuffer.allocate(1024); http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining()); List buffers = lease.getByteBuffers(); - http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[buffers.size()])); + http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0])); // Expect the connection to be closed. - assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); } @Test public void testClientExceedingStreamWindow() throws Exception { // On server, we don't consume the data. + CountDownLatch serverCloseLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() { @Override @@ -828,16 +845,29 @@ public abstract class FlowControlStrategyTest } }; } - }); - final CountDownLatch closeLatch = new CountDownLatch(1); - Session session = newClient(new Session.Listener.Adapter() - { @Override public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session session = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) { if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code) - closeLatch.countDown(); + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); } }); @@ -848,7 +878,7 @@ public abstract class FlowControlStrategyTest session.newStream(requestFrame, streamPromise, new Stream.Listener.Adapter()); Stream stream = streamPromise.get(5, TimeUnit.SECONDS); ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE); - final CountDownLatch dataLatch = new CountDownLatch(1); + CountDownLatch dataLatch = new CountDownLatch(1); stream.data(new DataFrame(stream.getId(), data, false), new Callback() { @Override @@ -876,10 +906,12 @@ public abstract class FlowControlStrategyTest ByteBuffer extraData = ByteBuffer.allocate(1024); http2Session.getGenerator().data(lease, new DataFrame(stream.getId(), extraData, true), extraData.remaining()); List buffers = lease.getByteBuffers(); - http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[buffers.size()])); + http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0])); // Expect the connection to be closed. - assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); } @Test @@ -916,7 +948,7 @@ public abstract class FlowControlStrategyTest MetaData.Request metaData = newRequest("POST", HttpFields.EMPTY); HeadersFrame frame = new HeadersFrame(metaData, null, false); FuturePromise streamPromise = new FuturePromise<>(); - final CountDownLatch resetLatch = new CountDownLatch(1); + CountDownLatch resetLatch = new CountDownLatch(1); session.newStream(frame, streamPromise, new Stream.Listener.Adapter() { @Override @@ -929,7 +961,7 @@ public abstract class FlowControlStrategyTest // Perform a big upload that will stall the flow control windows. ByteBuffer data = ByteBuffer.allocate(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE); - final CountDownLatch dataLatch = new CountDownLatch(1); + CountDownLatch dataLatch = new CountDownLatch(1); stream.data(new DataFrame(stream.getId(), data, true), new Callback() { @Override diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java new file mode 100644 index 00000000000..6585dc686ed --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java @@ -0,0 +1,1097 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under +// the terms of the Eclipse Public License 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0 +// +// This Source Code may also be made available under the following +// Secondary Licenses when the conditions for such availability set +// forth in the Eclipse Public License, v. 2.0 are satisfied: +// the Apache License v2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.nio.ByteBuffer; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.CloseState; +import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.FlowControlStrategy; +import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.ISession; +import org.eclipse.jetty.http2.IStream; +import org.eclipse.jetty.http2.SimpleFlowControlStrategy; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.GoAwayFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.component.LifeCycle; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class GoAwayTest extends AbstractTest +{ + @Test + public void testClientGoAwayServerReplies() throws Exception + { + CountDownLatch serverLatch = new CountDownLatch(1); + AtomicReference serverSessionRef = new AtomicReference<>(); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverSessionRef.set(stream.getSession()); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + return null; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverLatch.countDown(); + } + }); + + CountDownLatch clientLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientLatch.countDown(); + } + }); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientSession.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); + } + }); + + Assertions.assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertSame(CloseState.CLOSED, ((HTTP2Session)serverSessionRef.get()).getCloseState()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + Assertions.assertSame(CloseState.CLOSED, ((HTTP2Session)clientSession).getCloseState()); + } + + @Test + public void testServerGoAwayWithInFlightStreamClientFailsStream() throws Exception + { + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverSessionRef.set(stream.getSession()); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + return null; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + + MetaData.Request request1 = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + CountDownLatch streamFailureLatch = new CountDownLatch(1); + clientSession.newStream(new HeadersFrame(request1, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + // Simulate the server closing while the client sends a second request. + // The server sends a lastStreamId for the first request, and discards the second. + serverSessionRef.get().close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); + // The client sends the second request and should eventually fail it + // locally since it has a larger streamId, and the server discarded it. + MetaData.Request request2 = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + clientSession.newStream(new HeadersFrame(request2, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) + { + streamFailureLatch.countDown(); + callback.succeeded(); + } + }); + } + }); + + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + } + + @Test + public void testServerGracefulGoAway() throws Exception + { + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + AtomicReference serverSessionRef = new AtomicReference<>(); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverSessionRef.set(stream.getSession()); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + return null; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (!frame.isGraceful()) + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + clientGracefulGoAwayLatch.countDown(); + else + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (!frame.isGraceful()) + clientCloseLatch.countDown(); + } + }); + CountDownLatch clientLatch = new CountDownLatch(1); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + } + }); + + Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + + // Send a graceful GOAWAY from the server. + // Because the server had no pending streams, it will send also a non-graceful GOAWAY. + ((HTTP2Session)serverSessionRef.get()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testServerGracefulGoAwayWithStreamsServerClosesWhenLastStreamCloses() throws Exception + { + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + AtomicReference serverSessionRef = new AtomicReference<>(); + AtomicReference serverStreamRef = new AtomicReference<>(); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreamRef.set(stream); + Session session = stream.getSession(); + serverSessionRef.set(session); + + // Send a graceful GOAWAY while processing a stream. + ((HTTP2Session)session).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + return null; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (!frame.isGraceful()) + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + clientGracefulGoAwayLatch.countDown(); + else + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (!frame.isGraceful()) + clientCloseLatch.countDown(); + } + }); + CountDownLatch clientLatch = new CountDownLatch(1); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + } + }); + + // Wait for the graceful GOAWAY. + Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + + // Now the client cannot create new streams. + FuturePromise streamPromise = new FuturePromise<>(); + clientSession.newStream(new HeadersFrame(newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY), null, true), streamPromise, null); + Assertions.assertThrows(ExecutionException.class, () -> streamPromise.get(5, TimeUnit.SECONDS)); + + // The client must not reply to a graceful GOAWAY. + Assertions.assertFalse(serverGoAwayLatch.await(1, TimeUnit.SECONDS)); + + // Previous streams must complete successfully. + Stream serverStream = serverStreamRef.get(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP); + + Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + + // The server should have sent the GOAWAY after the last stream completed. + + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testServerGoAwayWithStalledStreamServerConsumesDataOfInFlightStream() throws Exception + { + int flowControlWindow = 32 * 1024; + + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + AtomicInteger dataFrames = new AtomicInteger(); + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + // Do not consume the data for this stream (i.e. don't succeed the callback). + // Only send the response when receiving the first DATA frame. + if (dataFrames.incrementAndGet() == 1) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + } + } + }; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }, h2 -> + { + // Use the simple, predictable, strategy for window updates. + h2.setFlowControlStrategyFactory(SimpleFlowControlStrategy::new); + h2.setInitialSessionRecvWindow(flowControlWindow); + h2.setInitialStreamRecvWindow(flowControlWindow); + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + // This is necessary because the server session window is smaller than the + // default and the server cannot send a WINDOW_UPDATE with a negative value. + ((ISession)clientSession).updateSendWindow(flowControlWindow - FlowControlStrategy.DEFAULT_WINDOW_SIZE); + + MetaData.Request request1 = newRequest("GET", HttpFields.EMPTY); + HeadersFrame headersFrame1 = new HeadersFrame(request1, null, false); + DataFrame dataFrame1 = new DataFrame(ByteBuffer.allocate(flowControlWindow / 2), false); + ((ISession)clientSession).newStream(new IStream.FrameList(headersFrame1, dataFrame1, null), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream clientStream1, HeadersFrame frame) + { + // Send the server GOAWAY frame. + serverSessionRef.get().close(ErrorCode.NO_ERROR.code, null, Callback.NOOP); + + // Send a second, in-flight, stream with data, which + // will exhaust the client session flow control window. + // The server should consume the data even if it will drop + // this stream, so that the first stream can send more data. + MetaData.Request request2 = newRequest("POST", HttpFields.EMPTY); + HeadersFrame headersFrame2 = new HeadersFrame(request2, null, false); + DataFrame dataFrame2 = new DataFrame(ByteBuffer.allocate(flowControlWindow / 2), true); + ((ISession)clientStream1.getSession()).newStream(new IStream.FrameList(headersFrame2, dataFrame2, null), new Promise.Adapter<>() + { + @Override + public void succeeded(Stream clientStream2) + { + // After the in-flight stream is sent, try to complete the first stream. + // The client should receive the window update from + // the server and be able to complete this stream. + clientStream1.data(new DataFrame(clientStream1.getId(), ByteBuffer.allocate(flowControlWindow / 2), true), Callback.NOOP); + } + }, new Adapter()); + } + }); + + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testClientGoAwayWithStreamsServerClosesWhenLastStreamCloses() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + CountDownLatch serverStreamLatch = new CountDownLatch(1); + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreamRef.set(stream); + serverStreamLatch.countDown(); + return null; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + + CountDownLatch clientLatch = new CountDownLatch(1); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + } + }); + + Assertions.assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS)); + + // The client sends a GOAWAY. + clientSession.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); + + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + + // The client must not receive a GOAWAY until the all streams are completed. + Assertions.assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS)); + + // Complete the stream. + Stream serverStream = serverStreamRef.get(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP); + + Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverStream.getSession()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testServerGracefulGoAwayWithStreamsClientGoAwayServerClosesWhenLastStreamCloses() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + CountDownLatch serverStreamLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreamRef.set(stream); + serverStreamLatch.countDown(); + + // Send a graceful GOAWAY while processing a stream. + ((HTTP2Session)stream.getSession()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + return null; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + { + // Send a GOAWAY when receiving a graceful GOAWAY. + session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); + } + else + { + clientGoAwayLatch.countDown(); + } + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + + CountDownLatch clientLatch = new CountDownLatch(1); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + } + }); + + // The server has a pending stream, so it does not send the non-graceful GOAWAY yet. + Assertions.assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS)); + + // Complete the stream, the server should send the non-graceful GOAWAY. + Stream serverStream = serverStreamRef.get(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP); + + // The server already received the client GOAWAY, + // so completing the last stream produces a close event. + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + // The client should receive the server non-graceful GOAWAY. + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverStream.getSession()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testClientGracefulGoAwayWithStreamsServerGracefulGoAwayServerClosesWhenLastStreamCloses() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreamRef.set(stream); + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (frame.isEndStream()) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), callback); + } + else + { + callback.succeeded(); + } + } + }; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + { + // Send a graceful GOAWAY. + ((HTTP2Session)session).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + } + else + { + serverGoAwayLatch.countDown(); + } + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + clientGracefulGoAwayLatch.countDown(); + else + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + FuturePromise promise = new FuturePromise<>(); + clientSession.newStream(new HeadersFrame(request, null, false), promise, new Stream.Listener.Adapter()); + Stream clientStream = promise.get(5, TimeUnit.SECONDS); + + // Send a graceful GOAWAY from the client. + ((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + // The server should send a graceful GOAWAY. + Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + + // Complete the stream. + clientStream.data(new DataFrame(clientStream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); + + // Both client and server should send a non-graceful GOAWAY. + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverStreamRef.get().getSession()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testClientShutdownServerCloses() throws Exception + { + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverSessionRef.set(session); + serverCloseLatch.countDown(); + } + }); + + Session clientSession = newClient(new Session.Listener.Adapter()); + // TODO: get rid of sleep! + // Wait for the SETTINGS frames to be exchanged. + Thread.sleep(500); + + ((HTTP2Session)clientSession).getEndPoint().close(); + + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + } + + @Test + public void testServerGracefulGoAwayClientShutdownServerCloses() throws Exception + { + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + // Reply to the graceful GOAWAY from the server with a TCP close. + ((HTTP2Session)session).getEndPoint().close(); + } + }); + // Wait for the SETTINGS frames to be exchanged. + Thread.sleep(500); + + // Send a graceful GOAWAY to the client. + ((HTTP2Session)serverSessionRef.get()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + } + + // TODO: add a shutdown test with pending stream. + + @Test + public void testServerIdleTimeout() throws Exception + { + long idleTimeout = 1000; + + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + ((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout); + } + + @Override + public boolean onIdleTimeout(Session session) + { + serverIdleTimeoutLatch.countDown(); + return true; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (!frame.isGraceful()) + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + + Assertions.assertTrue(serverIdleTimeoutLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + // Server should send a GOAWAY to the client. + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + // The client replied to server's GOAWAY, but the server already closed. + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testServerGracefulGoAwayWithStreamsServerIdleTimeout() throws Exception + { + long idleTimeout = 1000; + + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + ((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout); + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + stream.setIdleTimeout(10 * idleTimeout); + // Send a graceful GOAWAY. + ((HTTP2Session)stream.getSession()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + return null; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + clientGracefulGoAwayLatch.countDown(); + else + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + CountDownLatch clientResetLatch = new CountDownLatch(1); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + // Send request headers but not data. + clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + clientResetLatch.countDown(); + } + }); + + Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + // Server idle timeout sends a non-graceful GOAWAY. + Assertions.assertTrue(clientResetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testClientGracefulGoAwayWithStreamsServerIdleTimeout() throws Exception + { + long idleTimeout = 1000; + + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + ((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout); + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + stream.setIdleTimeout(10 * idleTimeout); + return null; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + serverGracefulGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + CountDownLatch streamResetLatch = new CountDownLatch(1); + clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + streamResetLatch.countDown(); + } + }); + + // Client sends a graceful GOAWAY. + ((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + Assertions.assertTrue(serverGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(streamResetLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testServerGoAwayWithStreamsThenStop() throws Exception + { + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverSessionRef.set(stream.getSession()); + // Don't reply, don't reset the stream, just send the GOAWAY. + stream.getSession().close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); + return null; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + + MetaData.Request request = newRequest(HttpMethod.GET.asString(), HttpFields.EMPTY); + CountDownLatch clientResetLatch = new CountDownLatch(1); + clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + clientResetLatch.countDown(); + } + }); + + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + + // Neither the client nor the server are finishing + // the pending stream, so force the stop on the server. + LifeCycle.stop(serverSessionRef.get()); + + // The server should reset all the pending streams. + Assertions.assertTrue(clientResetLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } +} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java index a49cfe13488..4b3a67598ad 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/HTTP2Test.java @@ -930,9 +930,20 @@ public class HTTP2Test extends AbstractTest // Avoid aggressive idle timeout to allow the test verifications. connector.setShutdownIdleTimeout(connector.getIdleTimeout()); + CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); CountDownLatch clientCloseLatch = new CountDownLatch(1); Session clientSession = newClient(new Session.Listener.Adapter() { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + clientGracefulGoAwayLatch.countDown(); + else + clientGoAwayLatch.countDown(); + } + @Override public void onClose(Session session, GoAwayFrame frame) { @@ -977,26 +988,20 @@ public class HTTP2Test extends AbstractTest int port = connector.getLocalPort(); CompletableFuture shutdown = Graceful.shutdown(server); - // GOAWAY should not arrive to the client yet. - assertFalse(clientCloseLatch.await(1, TimeUnit.SECONDS)); + // Client should receive the graceful GOAWAY. + assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + // Client should not receive the non-graceful GOAWAY. + assertFalse(clientGoAwayLatch.await(500, TimeUnit.MILLISECONDS)); + // Client should not be closed yet. + assertFalse(clientCloseLatch.await(500, TimeUnit.MILLISECONDS)); - // New requests should be immediately rejected. + // Client cannot create new requests after receiving a GOAWAY. HostPortHttpField authority3 = new HostPortHttpField("localhost" + ":" + port); MetaData.Request metaData3 = new MetaData.Request("GET", HttpScheme.HTTP.asString(), authority3, servletPath, HttpVersion.HTTP_2, HttpFields.EMPTY, -1); - HeadersFrame request3 = new HeadersFrame(metaData3, null, false); + HeadersFrame request3 = new HeadersFrame(metaData3, null, true); FuturePromise promise3 = new FuturePromise<>(); - CountDownLatch resetLatch = new CountDownLatch(1); - clientSession.newStream(request3, promise3, new Stream.Listener.Adapter() - { - @Override - public void onReset(Stream stream, ResetFrame frame) - { - resetLatch.countDown(); - } - }); - Stream stream3 = promise3.get(5, TimeUnit.SECONDS); - stream3.data(new DataFrame(stream3.getId(), ByteBuffer.allocate(1), true), Callback.NOOP); - assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + clientSession.newStream(request3, promise3, new Stream.Listener.Adapter()); + assertThrows(ExecutionException.class, () -> promise3.get(5, TimeUnit.SECONDS)); // Finish the previous requests and expect the responses. stream1.data(new DataFrame(stream1.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); @@ -1005,9 +1010,9 @@ public class HTTP2Test extends AbstractTest assertNull(shutdown.get(5, TimeUnit.SECONDS)); // Now GOAWAY should arrive to the client. + assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); - // Wait to process the GOAWAY frames and close the EndPoints. - Thread.sleep(1000); + assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); assertFalse(((HTTP2Session)serverSession).getEndPoint().isOpen()); } diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java index d8724ee1608..4fa95608a11 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java @@ -84,8 +84,8 @@ public class SessionFailureTest extends AbstractTest @Override public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) { - // Forcibly close the connection. - ((HTTP2Session)stream.getSession()).getEndPoint().close(); + // Forcibly shutdown the output to fail the write below. + ((HTTP2Session)stream.getSession()).getEndPoint().shutdownOutput(); // Now try to write something: it should fail. stream.headers(frame, new Callback() { diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java index af90b1f7bd6..bb10e7f8f02 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java @@ -321,7 +321,9 @@ public class StreamCloseTest extends AbstractTest MetaData.Request request = (MetaData.Request)frame.getMetaData(); if ("GET".equals(request.getMethod())) { - ((HTTP2Session)stream.getSession()).getEndPoint().close(); + // Only shutdown the output, since closing the EndPoint causes a call to + // stop() on different thread which tries to concurrently fail the stream. + ((HTTP2Session)stream.getSession()).getEndPoint().shutdownOutput(); // Try to write something to force an error. stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.NOOP); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index caa55148459..916bd7f88ea 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -267,7 +267,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher. { Runnable task = pollTask(); if (LOG.isDebugEnabled()) - LOG.debug("Dequeued task {}", task); + LOG.debug("Dequeued task {}", String.valueOf(task)); if (task != null) return task; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index f362645f751..f64e96c2e23 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -367,7 +367,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable // If the failure came from within the // flusher, we need to close the connection. if (closed == null) - session.abort(x); + session.onWriteFailure(x); } void terminate(Throwable cause) @@ -378,7 +378,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable closed = terminated; terminated = cause; if (LOG.isDebugEnabled()) - LOG.debug("{}", closed != null ? "Terminated" : "Terminating"); + LOG.debug("{} {}", closed != null ? "Terminated" : "Terminating", this); } if (closed == null) iterate(); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 563386e8b5c..80b0e22b88f 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -36,14 +36,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; -import org.eclipse.jetty.http2.frames.DisconnectFrame; import org.eclipse.jetty.http2.frames.FailureFrame; import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.FrameType; @@ -86,15 +86,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private final ConcurrentMap streams = new ConcurrentHashMap<>(); private final AtomicLong streamsOpened = new AtomicLong(); private final AtomicLong streamsClosed = new AtomicLong(); - private final StreamCreator streamCreator = new StreamCreator(); - private final AtomicBiInteger streamCount = new AtomicBiInteger(); // Hi = closed, Lo = stream count + private final StreamsState streamsState = new StreamsState(); private final AtomicInteger localStreamIds = new AtomicInteger(); private final AtomicInteger lastRemoteStreamId = new AtomicInteger(); private final AtomicInteger localStreamCount = new AtomicInteger(); private final AtomicBiInteger remoteStreamCount = new AtomicBiInteger(); private final AtomicInteger sendWindow = new AtomicInteger(); private final AtomicInteger recvWindow = new AtomicInteger(); - private final AtomicReference closed = new AtomicReference<>(CloseState.NOT_CLOSED); private final AtomicLong bytesWritten = new AtomicLong(); private final Scheduler scheduler; private final EndPoint endPoint; @@ -109,9 +107,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private int writeThreshold; private boolean pushEnabled; private boolean connectProtocolEnabled; - private long idleTime; - private GoAwayFrame closeFrame; - private Callback.Completable shutdownCallback; public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId) { @@ -124,13 +119,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio this.maxLocalStreams = -1; this.maxRemoteStreams = -1; this.localStreamIds.set(initialStreamId); - this.lastRemoteStreamId.set(isClientStream(initialStreamId) ? 0 : -1); this.streamIdleTimeout = endPoint.getIdleTimeout(); this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.writeThreshold = 32 * 1024; this.pushEnabled = true; // SPEC: by default, push is enabled. - this.idleTime = System.nanoTime(); addBean(flowControl); addBean(flusher); } @@ -139,26 +132,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio protected void doStop() throws Exception { super.doStop(); - close(ErrorCode.NO_ERROR.code, "stop", new Callback() - { - @Override - public void succeeded() - { - disconnect(); - } - - @Override - public void failed(Throwable x) - { - disconnect(); - } - - @Override - public InvocationType getInvocationType() - { - return InvocationType.NON_BLOCKING; - } - }); + streamsState.halt("stop"); } @ManagedAttribute(value = "The flow control strategy", readonly = true) @@ -257,7 +231,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } @Override - public void onData(final DataFrame frame, Callback callback) + public void onData(DataFrame frame, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("Received {} on {}", frame, this); @@ -273,9 +247,22 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio if (stream != null) { if (getRecvWindow() < 0) - onConnectionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", callback); + { + onSessionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", callback); + } else - stream.process(frame, new DataCallback(callback, stream, flowControlLength)); + { + if (stream.updateRecvWindow(0) < 0) + { + // It's a bad client, it does not deserve to be + // treated gently by just resetting the stream. + onSessionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", callback); + } + else + { + stream.process(frame, new DataCallback(callback, stream, flowControlLength)); + } + } } else { @@ -285,9 +272,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio // otherwise other requests will be stalled. flowControl.onDataConsumed(this, null, flowControlLength); if (isStreamClosed(streamId)) - reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), callback); + reset(null, new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), callback); else - onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_data_frame", callback); + onSessionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_data_frame", callback); } } @@ -394,7 +381,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio case SettingsFrame.INITIAL_WINDOW_SIZE: { if (LOG.isDebugEnabled()) - LOG.debug("Updating initial window size to {} for {}", value, this); + LOG.debug("Updating initial stream window size to {} for {}", value, this); flowControl.updateInitialStreamWindow(this, value, false); break; } @@ -455,16 +442,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } /** - * This method is called when receiving a GO_AWAY from the other peer. - * We check the close state to act appropriately: - *
    - *
  • NOT_CLOSED: we move to REMOTELY_CLOSED and queue a disconnect, so - * that the content of the queue is written, and then the connection - * closed. We notify the application after being terminated. - * See {@code HTTP2Session.ControlEntry#succeeded()}
  • - *
  • In all other cases, we do nothing since other methods are already - * performing their actions.
  • - *
+ *

This method is called when receiving a GO_AWAY from the other peer.

* * @param frame the GO_AWAY frame that has been received. * @see #close(int, String, Callback) @@ -472,22 +450,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio * @see #onIdleTimeout() */ @Override - public void onGoAway(final GoAwayFrame frame) + public void onGoAway(GoAwayFrame frame) { if (LOG.isDebugEnabled()) LOG.debug("Received {} on {}", frame, this); - - if (closed.compareAndSet(CloseState.NOT_CLOSED, CloseState.REMOTELY_CLOSED)) - { - // We received a GO_AWAY, so try to write - // what's in the queue and then disconnect. - closeFrame = frame; - onClose(frame, new DisconnectCallback()); - return; - } - - if (LOG.isDebugEnabled()) - LOG.debug("Ignored {}, already closed", frame); + streamsState.onGoAway(frame); } @Override @@ -506,7 +473,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio int streamSendWindow = stream.updateSendWindow(0); if (MathUtils.sumOverflows(streamSendWindow, windowDelta)) { - reset(new ResetFrame(streamId, ErrorCode.FLOW_CONTROL_ERROR.code), Callback.NOOP); + reset(stream, new ResetFrame(streamId, ErrorCode.FLOW_CONTROL_ERROR.code), Callback.NOOP); } else { @@ -548,18 +515,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void onStreamFailure(int streamId, int error, String reason) { - Callback callback = new ResetCallback(streamId, error, Callback.NOOP); + Callback callback = Callback.from(() -> reset(getStream(streamId), new ResetFrame(streamId, error), Callback.NOOP)); Throwable failure = toFailure(error, reason); if (LOG.isDebugEnabled()) LOG.debug("Stream #{} failure {}", streamId, this, failure); - onStreamFailure(streamId, error, reason, failure, callback); - } - - private void onStreamFailure(int streamId, int error, String reason, Throwable failure, Callback callback) - { IStream stream = getStream(streamId); if (stream != null) - stream.process(new FailureFrame(error, reason, failure), callback); + failStream(stream, error, reason, failure, callback); else callback.succeeded(); } @@ -567,22 +529,24 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void onConnectionFailure(int error, String reason) { - onConnectionFailure(error, reason, Callback.NOOP); + onSessionFailure(error, reason, Callback.NOOP); } - protected void onConnectionFailure(int error, String reason, Callback callback) + private void onSessionFailure(int error, String reason, Callback callback) { - Throwable failure = toFailure(error, reason); - if (LOG.isDebugEnabled()) - LOG.debug("Session failure {}", this, failure); - onFailure(error, reason, failure, new FailureCallback(error, reason, callback)); + streamsState.onSessionFailure(error, reason, callback); } - protected void abort(Throwable failure) + void onWriteFailure(Throwable failure) + { + streamsState.onWriteFailure(failure); + } + + protected void abort(String reason, Throwable failure, Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("Session abort {}", this, failure); - onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure)); + LOG.debug("Session abort {} for {}", reason, this, failure); + onFailure(ErrorCode.NO_ERROR.code, reason, failure, callback); } private void onFailure(int error, String reason, Throwable failure, Callback callback) @@ -592,26 +556,35 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio Callback countCallback = new CountingCallback(callback, count + 1); for (Stream stream : streams) { - onStreamFailure(stream.getId(), error, reason, failure, countCallback); + if (stream.isClosed()) + countCallback.succeeded(); + else + failStream(stream, error, reason, failure, countCallback); } notifyFailure(this, failure, countCallback); } - private void onClose(GoAwayFrame frame, Callback callback) + private void failStreams(Predicate matcher, String reason, boolean reset) { - int error = frame.getError(); - String reason = frame.tryConvertPayload(); + int error = ErrorCode.CANCEL_STREAM_ERROR.code; Throwable failure = toFailure(error, reason); - if (LOG.isDebugEnabled()) - LOG.debug("Session close {}", this, failure); - Collection streams = getStreams(); - int count = streams.size(); - Callback countCallback = new CountingCallback(callback, count + 1); - for (Stream stream : streams) + for (Stream stream : getStreams()) { - onStreamFailure(stream.getId(), error, reason, failure, countCallback); + if (stream.isClosed()) + continue; + if (!matcher.test((IStream)stream)) + continue; + if (LOG.isDebugEnabled()) + LOG.debug("Failing stream {} of {}", stream, this); + failStream(stream, error, reason, failure, Callback.NOOP); + if (reset) + stream.reset(new ResetFrame(stream.getId(), error), Callback.NOOP); } - notifyClose(this, frame, countCallback); + } + + private void failStream(Stream stream, int error, String reason, Throwable failure, Callback callback) + { + ((IStream)stream).process(new FailureFrame(error, reason, failure), callback); } private Throwable toFailure(int error, String reason) @@ -628,33 +601,30 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void newStream(IStream.FrameList frames, Promise promise, Stream.Listener listener) { - streamCreator.newStream(frames, promise, listener); + streamsState.newLocalStream(frames, promise, listener); } /** *

Creates a new stream allocating a stream id if the given HEADERS frame does not have one.

- *

The new HEADERS frame with the newly allocated stream id is returned as the first element - * of the array parameter.

* - * @param frameIn the HEADERS frame that triggered the stream creation - * @param frameOut an array of size 1 to return the HEADERS frame with the newly + * @param frame the HEADERS frame that triggered the stream creation * allocated stream id, or null if not interested in the modified headers frame - * @return a new stream + * @param listener the listener that gets notified of stream events */ - public IStream newLocalStream(HeadersFrame frameIn, HeadersFrame[] frameOut) + public void newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Promise promise) { - HeadersFrame frame = frameIn; + streamsState.newUpgradeStream(frame, listener, promise); +/* + // TODO: cannot do this, we need to call StreamsState. + HeadersFrame frame = frame; int streamId = frame.getStreamId(); if (streamId <= 0) { streamId = localStreamIds.getAndAdd(2); frame = frame.withStreamId(streamId); } - - if (frameOut != null) - frameOut[0] = frame; - return createLocalStream(streamId, (MetaData.Request)frame.getMetaData()); + */ } protected IStream newStream(int streamId, MetaData.Request request, boolean local) @@ -665,13 +635,23 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public int priority(PriorityFrame frame, Callback callback) { - return streamCreator.priority(frame, callback); + return streamsState.priority(frame, callback); } @Override public void push(IStream stream, Promise promise, PushPromiseFrame frame, Stream.Listener listener) { - streamCreator.push(frame, promise, listener); + streamsState.push(frame, new Promise.Wrapper<>(promise) + { + @Override + public void succeeded(Stream pushedStream) + { + // Pushed streams are implicitly remotely closed. + // They are closed when sending an end-stream DATA frame. + ((IStream)pushedStream).updateClose(true, CloseState.Event.RECEIVED); + super.succeeded(pushedStream); + } + }, listener); } @Override @@ -689,26 +669,20 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio control(null, callback, frame); } - protected void reset(ResetFrame frame, Callback callback) + void reset(IStream stream, ResetFrame frame, Callback callback) { - control(getStream(frame.getStreamId()), callback, frame); + control(stream, Callback.from(() -> + { + if (stream != null) + { + stream.close(); + removeStream(stream); + } + }, callback), frame); } /** - * Invoked internally and by applications to send a GO_AWAY frame to the - * other peer. We check the close state to act appropriately: - *
    - *
  • NOT_CLOSED: we move to LOCALLY_CLOSED and queue a GO_AWAY. When the - * GO_AWAY has been written, it will only cause the output to be shut - * down (not the connection closed), so that the application can still - * read frames arriving from the other peer. - * Ideally the other peer will notice the GO_AWAY and close the connection. - * When that happen, we close the connection from {@link #onShutdown()}. - * Otherwise, the idle timeout mechanism will close the connection, see - * {@link #onIdleTimeout()}.
  • - *
  • In all other cases, we do nothing since other methods are already - * performing their actions.
  • - *
+ *

Invoked internally and by applications to send a GO_AWAY frame to the other peer.

* * @param error the error code * @param reason the reason @@ -720,45 +694,28 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public boolean close(int error, String reason, Callback callback) { - if (closed.compareAndSet(CloseState.NOT_CLOSED, CloseState.LOCALLY_CLOSED)) - { - if (LOG.isDebugEnabled()) - LOG.debug("Closing {}/{} {}", error, reason, this); - closeFrame = newGoAwayFrame(CloseState.LOCALLY_CLOSED, error, reason); - control(null, callback, closeFrame); - return true; - } - if (LOG.isDebugEnabled()) - LOG.debug("Ignoring close {}/{}, already closed {}", error, reason, this); - callback.succeeded(); - return false; + LOG.debug("Closing {}/{} {}", ErrorCode.toString(error, null), reason, this); + return goAway(newGoAwayFrame(error, reason), callback); } @Override public CompletableFuture shutdown() { - if (closed.compareAndSet(CloseState.NOT_CLOSED, CloseState.LOCALLY_CLOSED)) - { - if (LOG.isDebugEnabled()) - LOG.debug("Shutting down {}", this); - closeFrame = newGoAwayFrame(CloseState.LOCALLY_CLOSED, ErrorCode.NO_ERROR.code, "shutdown"); - shutdownCallback = new Callback.Completable(); - // Only send the close frame when we can flip Hi and Lo = 0, see onStreamClosed(). - if (streamCount.compareAndSet(0, 1, 0, 0)) - control(null, shutdownCallback, closeFrame); - return shutdownCallback; - } - - if (LOG.isDebugEnabled()) - LOG.debug("Ignoring shutdown, already closed"); - Callback.Completable result = shutdownCallback; - // Result may be null if the shutdown is in progress, - // don't wait and return a completed CompletableFuture. - return result != null ? result : CompletableFuture.completedFuture(null); + return streamsState.shutdown(); } - private GoAwayFrame newGoAwayFrame(CloseState closeState, int error, String reason) + public boolean goAway(GoAwayFrame frame, Callback callback) + { + return streamsState.goAway(frame, callback); + } + + private GoAwayFrame newGoAwayFrame(int error, String reason) + { + return newGoAwayFrame(getLastRemoteStreamId(), error, reason); + } + + private GoAwayFrame newGoAwayFrame(int lastRemoteStreamId, int error, String reason) { byte[] payload = null; if (reason != null) @@ -767,13 +724,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio reason = reason.substring(0, Math.min(reason.length(), 32)); payload = reason.getBytes(StandardCharsets.UTF_8); } - return new GoAwayFrame(closeState, getLastRemoteStreamId(), error, payload); + return new GoAwayFrame(lastRemoteStreamId, error, payload); } @Override public boolean isClosed() { - return closed.get() != CloseState.NOT_CLOSED; + return getCloseState() != CloseState.NOT_CLOSED; + } + + public CloseState getCloseState() + { + return streamsState.getCloseState(); } private void control(IStream stream, Callback callback, Frame frame) @@ -828,15 +790,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - protected IStream createLocalStream(int streamId, MetaData.Request request) + protected IStream createLocalStream(int streamId, MetaData.Request request, Promise promise) { while (true) { int localCount = localStreamCount.get(); int maxCount = getMaxLocalStreams(); if (maxCount >= 0 && localCount >= maxCount) - // TODO: remove the dump() in the exception message. - throw new IllegalStateException("Max local stream count " + maxCount + " exceeded" + System.lineSeparator() + dump()); + { + promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded")); + return null; + } if (localStreamCount.compareAndSet(localCount, localCount + 1)) break; } @@ -853,12 +817,26 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio else { localStreamCount.decrementAndGet(); - throw new IllegalStateException("Duplicate stream " + streamId); + promise.failed(new IllegalStateException("Duplicate stream " + streamId)); + return null; } } protected IStream createRemoteStream(int streamId, MetaData.Request request) { + // This stream has been seen the server. + // Even if the stream cannot be created because this peer is closing, + // updating the lastRemoteStreamId ensures that in-flight HEADERS and + // DATA frames can be read (and discarded) without causing an error. + updateLastRemoteStreamId(streamId); + + if (!streamsState.newRemoteStream(streamId)) + { + if (LOG.isDebugEnabled()) + LOG.debug("Could not create remote stream #{} for {}", streamId, this); + return null; + } + // SPEC: exceeding max concurrent streams is treated as stream error. while (true) { @@ -868,8 +846,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio int maxCount = getMaxRemoteStreams(); if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) { - updateLastRemoteStreamId(streamId); - reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP); + reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId))); return null; } if (remoteStreamCount.compareAndSet(encoded, remoteCount + 1, remoteClosing)) @@ -877,11 +854,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } IStream stream = newStream(streamId, request, false); - - // SPEC: duplicate stream is treated as connection error. if (streams.putIfAbsent(streamId, stream) == null) { - updateLastRemoteStreamId(streamId); stream.setIdleTimeout(getStreamIdleTimeout()); flowControl.onStreamCreated(stream); if (LOG.isDebugEnabled()) @@ -891,6 +865,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio else { remoteStreamCount.addAndGetHi(-1); + onStreamDestroyed(streamId); + // SPEC: duplicate stream is treated as connection error. onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream"); return null; } @@ -905,16 +881,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } @Override - public void removeStream(IStream stream) + public boolean removeStream(IStream stream) { - IStream removed = streams.remove(stream.getId()); - if (removed != null) - { - onStreamClosed(stream); - flowControl.onStreamDestroyed(stream); - if (LOG.isDebugEnabled()) - LOG.debug("Removed {} {} from {}", stream.isLocal() ? "local" : "remote", stream, this); - } + int streamId = stream.getId(); + IStream removed = streams.remove(streamId); + if (removed == null) + return false; + if (LOG.isDebugEnabled()) + LOG.debug("Removed {} {} from {}", stream.isLocal() ? "local" : "remote", stream, this); + onStreamClosed(stream); + flowControl.onStreamDestroyed(stream); + onStreamDestroyed(streamId); + return true; } @Override @@ -926,7 +904,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @ManagedAttribute("The number of active streams") public int getStreamCount() { - return streams.size(); + return streamsState.streamCount.intValue(); } @Override @@ -990,21 +968,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } /** - * A typical close by a remote peer involves a GO_AWAY frame followed by TCP FIN. - * This method is invoked when the TCP FIN is received, or when an exception is - * thrown while reading, and we check the close state to act appropriately: - *
    - *
  • NOT_CLOSED: means that the remote peer did not send a GO_AWAY (abrupt close) - * or there was an exception while reading, and therefore we terminate.
  • - *
  • LOCALLY_CLOSED: we have sent the GO_AWAY to the remote peer, which received - * it and closed the connection; we queue a disconnect to close the connection - * on the local side. - * The GO_AWAY just shutdown the output, so we need this step to make sure the - * connection is closed. See {@link #close(int, String, Callback)}.
  • - *
  • REMOTELY_CLOSED: we received the GO_AWAY, and the TCP FIN afterwards, so we - * do nothing since the handling of the GO_AWAY will take care of closing the - * connection. See {@link #onGoAway(GoAwayFrame)}.
  • - *
+ *

This method is called when the TCP FIN is received from the remote peer.

* * @see #onGoAway(GoAwayFrame) * @see #close(int, String, Callback) @@ -1013,53 +977,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void onShutdown() { - if (LOG.isDebugEnabled()) - LOG.debug("Shutting down {}", this); - - switch (closed.get()) - { - case NOT_CLOSED: - { - // The other peer did not send a GO_AWAY, no need to be gentle. - if (LOG.isDebugEnabled()) - LOG.debug("Abrupt close for {}", this); - abort(new ClosedChannelException()); - break; - } - case LOCALLY_CLOSED: - { - // We have closed locally, and only shutdown - // the output; now queue a disconnect. - control(null, Callback.NOOP, new DisconnectFrame()); - break; - } - case REMOTELY_CLOSED: - { - // Nothing to do, the GO_AWAY frame we - // received will close the connection. - break; - } - default: - { - break; - } - } + streamsState.onShutdown(); } /** - * This method is invoked when the idle timeout triggers. We check the close state - * to act appropriately: - *
    - *
  • NOT_CLOSED: it's a real idle timeout, we just initiate a close, see - * {@link #close(int, String, Callback)}.
  • - *
  • LOCALLY_CLOSED: we have sent a GO_AWAY and only shutdown the output, but the - * other peer did not close the connection so we never received the TCP FIN, and - * therefore we terminate.
  • - *
  • REMOTELY_CLOSED: the other peer sent us a GO_AWAY, we should have queued a - * disconnect, but for some reason it was not processed (for example, queue was - * stuck because of TCP congestion), therefore we terminate. - * See {@link #onGoAway(GoAwayFrame)}.
  • - *
+ *

This method is invoked when the idle timeout expires.

* * @return true if the session should be closed, false otherwise * @see #onGoAway(GoAwayFrame) @@ -1069,31 +991,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public boolean onIdleTimeout() { - switch (closed.get()) - { - case NOT_CLOSED: - { - long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTime); - if (elapsed < endPoint.getIdleTimeout()) - return false; - return notifyIdleTimeout(this); - } - case LOCALLY_CLOSED: - case REMOTELY_CLOSED: - { - abort(new TimeoutException("Idle timeout " + endPoint.getIdleTimeout() + " ms")); - return false; - } - default: - { - return false; - } - } + return streamsState.onIdleTimeout(); } private void notIdle() { - idleTime = System.nanoTime(); + streamsState.idleTime = System.nanoTime(); } @Override @@ -1102,32 +1005,32 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "upgrade"); } - protected void onStreamOpened(IStream stream) + private void onStreamCreated(int streamId) { - streamsOpened.incrementAndGet(); - streamCount.addAndGetLo(1); + if (LOG.isDebugEnabled()) + LOG.debug("Created stream #{} for {}", streamId, this); + streamsState.onStreamCreated(); } - protected void onStreamClosed(IStream stream) + protected final void onStreamOpened(IStream stream) { + if (LOG.isDebugEnabled()) + LOG.debug("Opened stream {} for {}", stream, this); + streamsOpened.incrementAndGet(); + } + + private void onStreamClosed(IStream stream) + { + if (LOG.isDebugEnabled()) + LOG.debug("Closed stream {} for {}", stream, this); streamsClosed.incrementAndGet(); - Callback callback = null; - while (true) - { - long encoded = streamCount.get(); - int closed = AtomicBiInteger.getHi(encoded); - int streams = AtomicBiInteger.getLo(encoded) - 1; - if (streams == 0 && closed == 0) - { - callback = shutdownCallback; - closed = 1; - } - if (streamCount.compareAndSet(encoded, closed, streams)) - break; - } - // Only send the close frame if we can flip Hi and Lo = 0, see shutdown(). - if (callback != null) - control(null, callback, closeFrame); + } + + private void onStreamDestroyed(int streamId) + { + if (LOG.isDebugEnabled()) + LOG.debug("Destroyed stream #{} for {}", streamId, this); + streamsState.onStreamDestroyed(); } @Override @@ -1136,6 +1039,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio flusher.onFlushed(bytes); } + private void terminate(Throwable cause) + { + flusher.terminate(cause); + disconnect(); + } + public void disconnect() { if (LOG.isDebugEnabled()) @@ -1143,38 +1052,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio endPoint.close(); } - private void terminate(Throwable cause) - { - while (true) - { - CloseState current = closed.get(); - switch (current) - { - case NOT_CLOSED: - case LOCALLY_CLOSED: - case REMOTELY_CLOSED: - { - if (closed.compareAndSet(current, CloseState.CLOSED)) - { - flusher.terminate(cause); - for (IStream stream : streams.values()) - { - stream.close(); - } - streams.clear(); - disconnect(); - return; - } - break; - } - default: - { - return; - } - } - } - } - public boolean isDisconnected() { return !endPoint.isOpen(); @@ -1239,6 +1116,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } + protected void notifyGoAway(Session session, GoAwayFrame frame) + { + try + { + listener.onGoAway(session, frame); + } + catch (Throwable x) + { + LOG.info("Failure while notifying listener " + listener, x); + } + } + protected void notifyClose(Session session, GoAwayFrame frame, Callback callback) { try @@ -1306,16 +1195,15 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public String toString() { - return String.format("%s@%x{l:%s <-> r:%s,sendWindow=%s,recvWindow=%s,streams=%d,%s,%s}", + return String.format("%s@%x{local:%s,remote:%s,sendWindow=%s,recvWindow=%s,%s}", getClass().getSimpleName(), hashCode(), getEndPoint().getLocalAddress(), getEndPoint().getRemoteAddress(), sendWindow, recvWindow, - streams.size(), - closed, - closeFrame); + streamsState + ); } private class ControlEntry extends HTTP2Flusher.Entry @@ -1412,39 +1300,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio removeStream(stream); break; } - case RST_STREAM: - { - if (stream != null) - { - stream.close(); - removeStream(stream); - } - break; - } - case PUSH_PROMISE: - { - // Pushed streams are implicitly remotely closed. - // They are closed when sending an end-stream DATA frame. - stream.updateClose(true, CloseState.Event.RECEIVED); - break; - } - case GO_AWAY: - { - // We just sent a GO_AWAY, only shutdown the - // output without closing yet, to allow reads. - getEndPoint().shutdownOutput(); - break; - } case WINDOW_UPDATE: { flowControl.windowUpdate(HTTP2Session.this, stream, (WindowUpdateFrame)frame); break; } - case DISCONNECT: - { - terminate(new ClosedChannelException()); - break; - } default: { break; @@ -1453,14 +1313,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio super.succeeded(); } - - @Override - public void failed(Throwable x) - { - if (frame.getType() == FrameType.DISCONNECT) - terminate(new ClosedChannelException()); - super.failed(x); - } } private class DataEntry extends HTTP2Flusher.Entry @@ -1563,30 +1415,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - private static class StreamPromiseCallback implements Callback - { - private final Promise promise; - private final IStream stream; - - private StreamPromiseCallback(Promise promise, IStream stream) - { - this.promise = promise; - this.stream = stream; - } - - @Override - public void succeeded() - { - promise.succeeded(stream); - } - - @Override - public void failed(Throwable x) - { - promise.failed(x); - } - } - private class DataCallback extends Callback.Nested { private final IStream stream; @@ -1623,38 +1451,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - private class ResetCallback extends Callback.Nested - { - private final int streamId; - private final int error; - - private ResetCallback(int streamId, int error, Callback callback) - { - super(callback); - this.streamId = streamId; - this.error = error; - } - - @Override - public void succeeded() - { - complete(); - } - - @Override - public void failed(Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("Reset failed", x); - complete(); - } - - private void complete() - { - reset(new ResetFrame(streamId, error), getCallback()); - } - } - private class OnResetCallback implements Callback { @Override @@ -1683,221 +1479,779 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - private class FailureCallback extends Callback.Nested - { - private final int error; - private final String reason; - - private FailureCallback(int error, String reason, Callback callback) - { - super(callback); - this.error = error; - this.reason = reason; - } - - @Override - public void succeeded() - { - complete(); - } - - @Override - public void failed(Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("FailureCallback failed", x); - complete(); - } - - private void complete() - { - close(error, reason, getCallback()); - } - } - - private class DisconnectCallback implements Callback - { - @Override - public void succeeded() - { - complete(); - } - - @Override - public void failed(Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("DisconnectCallback failed", x); - complete(); - } - - @Override - public InvocationType getInvocationType() - { - return InvocationType.NON_BLOCKING; - } - - private void complete() - { - frames(null, List.of(newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame()), Callback.NOOP); - } - } - - private class TerminateCallback implements Callback - { - private final Throwable failure; - - private TerminateCallback(Throwable failure) - { - this.failure = failure; - } - - @Override - public void succeeded() - { - complete(); - } - - @Override - public void failed(Throwable x) - { - if (x != failure) - failure.addSuppressed(x); - complete(); - } - - @Override - public InvocationType getInvocationType() - { - return InvocationType.NON_BLOCKING; - } - - private void complete() - { - terminate(failure); - } - } - /** - * SPEC: It is required that stream ids are monotonically increasing. - * Here we use a queue to atomically create the stream id and - * claim the slot in the queue. Concurrent threads will only - * flush up to the slot with a non-null entry to make sure - * frames are sent strictly in their stream id order. - * See https://tools.ietf.org/html/rfc7540#section-5.1.1. + *

The HTTP/2 specification requires that stream ids are monotonically increasing, + * see https://tools.ietf.org/html/rfc7540#section-5.1.1.

+ *

This implementation uses a queue to atomically reserve a stream id and claim + * a slot in the queue; the slot is then assigned the entries to write.

+ *

Concurrent threads push slots in the queue but only one thread flushes + * the slots, up to the slot that has a non-null entries to write, therefore + * guaranteeing that frames are sent strictly in their stream id order.

+ *

This class also coordinates the creation of streams with the close of + * the session, see https://tools.ietf.org/html/rfc7540#section-6.8.

*/ - private class StreamCreator + private class StreamsState { private final AutoLock lock = new AutoLock(); private final Queue slots = new ArrayDeque<>(); + // Must be incremented with the lock held. + private final AtomicLong streamCount = new AtomicLong(); + private long idleTime = System.nanoTime(); + private CloseState closed = CloseState.NOT_CLOSED; + private Runnable zeroStreamsAction; + private GoAwayFrame goAwayRecv; + private GoAwayFrame goAwaySent; + private Throwable failure; private Thread flushing; + private CompletableFuture shutdownCallback; + + private CloseState getCloseState() + { + try (AutoLock l = lock.lock()) + { + return closed; + } + } + + private CompletableFuture shutdown() + { + CompletableFuture future; + try (AutoLock l = lock.lock()) + { + if (shutdownCallback != null) + return shutdownCallback; + shutdownCallback = future = new Callback.Completable(); + } + goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + return future; + } + + private boolean goAway(GoAwayFrame frame, Callback callback) + { + boolean sendGoAway = false; + boolean tryRunZeroStreamsAction = false; + try (AutoLock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + { + goAwaySent = frame; + closed = CloseState.LOCALLY_CLOSED; + sendGoAway = true; + if (frame.isGraceful()) + { + // Try to send the non-graceful GOAWAY + // when the last stream is destroyed. + zeroStreamsAction = () -> + { + GoAwayFrame goAwayFrame = newGoAwayFrame(ErrorCode.NO_ERROR.code, "close"); + goAway(goAwayFrame, Callback.NOOP); + }; + tryRunZeroStreamsAction = streamCount.get() == 0; + } + break; + } + case LOCALLY_CLOSED: + { + if (frame.isGraceful()) + { + // Trying to send a non-first, but graceful, GOAWAY, ignore this one. + if (LOG.isDebugEnabled()) + LOG.debug("Already sent, ignored GOAWAY {} for {}", frame, HTTP2Session.this); + } + else + { + // SPEC: see section 6.8. + if (goAwaySent.isGraceful() || + frame.getLastStreamId() < goAwaySent.getLastStreamId() || + frame.getError() != ErrorCode.NO_ERROR.code) + { + goAwaySent = frame; + sendGoAway = true; + } + else + { + // Trying to send another non-graceful GOAWAY, ignore this one. + if (LOG.isDebugEnabled()) + LOG.debug("Already sent, ignored GOAWAY {} for {}", frame, HTTP2Session.this); + } + } + break; + } + case REMOTELY_CLOSED: + { + goAwaySent = frame; + sendGoAway = true; + if (frame.isGraceful()) + { + // Try to send the non-graceful GOAWAY + // when the last stream is destroyed. + zeroStreamsAction = () -> + { + GoAwayFrame goAwayFrame = newGoAwayFrame(ErrorCode.NO_ERROR.code, "close"); + goAway(goAwayFrame, Callback.NOOP); + }; + tryRunZeroStreamsAction = streamCount.get() == 0; + } + else + { + if (goAwayRecv.isGraceful()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Waiting non-graceful GOAWAY for {}", HTTP2Session.this); + } + else + { + closed = CloseState.CLOSING; + zeroStreamsAction = () -> terminate(frame); + tryRunZeroStreamsAction = streamCount.get() == 0; + } + } + break; + } + default: + { + // Already closing or closed, ignore it. + if (LOG.isDebugEnabled()) + LOG.debug("Already closed, ignored {} for {}", frame, HTTP2Session.this); + break; + } + } + } + + if (sendGoAway) + { + if (tryRunZeroStreamsAction) + sendGoAway(frame, Callback.from(callback, this::tryRunZeroStreamsAction)); + else + sendGoAway(frame, callback); + return true; + } + else + { + callback.succeeded(); + return false; + } + } + + private void halt(String reason) + { + if (LOG.isDebugEnabled()) + LOG.debug("Halting ({}) for {}", reason, HTTP2Session.this); + GoAwayFrame goAwayFrame = null; + GoAwayFrame goAwayFrameEvent; + try (AutoLock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + case REMOTELY_CLOSED: + case LOCALLY_CLOSED: + case CLOSING: + { + if (goAwaySent == null || goAwaySent.isGraceful()) + goAwaySent = goAwayFrame = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason); + goAwayFrameEvent = goAwayRecv != null ? goAwayRecv : goAwaySent; + closed = CloseState.CLOSED; + zeroStreamsAction = null; + if (failure != null) + failure = toFailure(ErrorCode.NO_ERROR.code, reason); + break; + } + default: + { + return; + } + } + } + failStreams(stream -> true, reason, true); + if (goAwayFrame != null) + sendGoAwayAndTerminate(goAwayFrame, goAwayFrameEvent); + else + terminate(goAwayFrameEvent); + } + + private void onGoAway(GoAwayFrame frame) + { + boolean failStreams = false; + boolean tryRunZeroStreamsAction = false; + try (AutoLock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + { + goAwayRecv = frame; + if (frame.isGraceful()) + { + closed = CloseState.REMOTELY_CLOSED; + if (LOG.isDebugEnabled()) + LOG.debug("Waiting non-graceful GOAWAY for {}", HTTP2Session.this); + } + else + { + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, "close"); + closed = CloseState.CLOSING; + GoAwayFrame goAwayFrame = goAwaySent; + zeroStreamsAction = () -> sendGoAwayAndTerminate(goAwayFrame, frame); + tryRunZeroStreamsAction = streamCount.get() == 0; + failStreams = true; + } + break; + } + case LOCALLY_CLOSED: + { + goAwayRecv = frame; + if (frame.isGraceful()) + { + // Wait for the non-graceful GOAWAY from the other peer. + if (LOG.isDebugEnabled()) + LOG.debug("Waiting non-graceful GOAWAY for {}", HTTP2Session.this); + } + else + { + closed = CloseState.CLOSING; + if (goAwaySent.isGraceful()) + { + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, "close"); + GoAwayFrame goAwayFrame = goAwaySent; + zeroStreamsAction = () -> sendGoAwayAndTerminate(goAwayFrame, frame); + tryRunZeroStreamsAction = streamCount.get() == 0; + } + else + { + zeroStreamsAction = () -> terminate(frame); + tryRunZeroStreamsAction = streamCount.get() == 0; + failStreams = true; + } + } + break; + } + case REMOTELY_CLOSED: + { + if (frame.isGraceful()) + { + // Received a non-first, but graceful, GOAWAY, ignore it. + if (LOG.isDebugEnabled()) + LOG.debug("Already received, ignoring GOAWAY for {}", HTTP2Session.this); + } + else + { + goAwayRecv = frame; + closed = CloseState.CLOSING; + if (goAwaySent == null || goAwaySent.isGraceful()) + { + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, "close"); + GoAwayFrame goAwayFrame = goAwaySent; + zeroStreamsAction = () -> sendGoAwayAndTerminate(goAwayFrame, frame); + } + else + { + zeroStreamsAction = () -> terminate(frame); + } + tryRunZeroStreamsAction = streamCount.get() == 0; + failStreams = true; + } + break; + } + default: + { + // Already closing or closed, ignore it. + if (LOG.isDebugEnabled()) + LOG.debug("Already closed, ignored {} for {}", frame, HTTP2Session.this); + break; + } + } + } + + notifyGoAway(HTTP2Session.this, frame); + + if (failStreams) + { + // Must compare the lastStreamId only with local streams. + // For example, a client that sent request with streamId=137 may send a GOAWAY(4), + // where streamId=4 is the last stream pushed by the server to the client. + // The server must not compare its local streamId=4 with remote streamId=137. + Predicate failIf = stream -> stream.isLocal() && stream.getId() > frame.getLastStreamId(); + failStreams(failIf, "closing", false); + } + + if (tryRunZeroStreamsAction) + tryRunZeroStreamsAction(); + } + + private void onShutdown() + { + String reason = "input_shutdown"; + Throwable cause = null; + boolean failStreams = false; + try (AutoLock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + case LOCALLY_CLOSED: + { + if (LOG.isDebugEnabled()) + LOG.debug("Unexpected ISHUT for {}", HTTP2Session.this); + closed = CloseState.CLOSING; + failure = cause = new ClosedChannelException(); + break; + } + case REMOTELY_CLOSED: + { + closed = CloseState.CLOSING; + GoAwayFrame goAwayFrame = newGoAwayFrame(0, ErrorCode.NO_ERROR.code, reason); + zeroStreamsAction = () -> terminate(goAwayFrame); + failure = new ClosedChannelException(); + failStreams = true; + break; + } + case CLOSING: + { + if (failure == null) + failure = new ClosedChannelException(); + failStreams = true; + break; + } + default: + { + if (LOG.isDebugEnabled()) + LOG.debug("Already closed, ignoring ISHUT for {}", HTTP2Session.this); + return; + } + } + } + + if (failStreams) + { + // Since nothing else will arrive from the other peer, reset + // the streams for which the other peer did not send all frames. + Predicate failIf = stream -> !stream.isRemotelyClosed(); + failStreams(failIf, reason, false); + tryRunZeroStreamsAction(); + } + else + { + GoAwayFrame goAwayFrame = newGoAwayFrame(0, ErrorCode.NO_ERROR.code, reason); + abort(reason, cause, Callback.from(() -> terminate(goAwayFrame))); + } + } + + private boolean onIdleTimeout() + { + String reason = "idle_timeout"; + boolean notify = false; + boolean sendGoAway = false; + GoAwayFrame goAwayFrame = null; + Throwable cause = null; + try (AutoLock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + { + long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTime); + if (elapsed < endPoint.getIdleTimeout()) + return false; + notify = true; + break; + } + // Timed out while waiting for closing events, fail all the streams. + case LOCALLY_CLOSED: + { + if (goAwaySent.isGraceful()) + { + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason); + sendGoAway = true; + } + goAwayFrame = goAwaySent; + closed = CloseState.CLOSING; + zeroStreamsAction = null; + failure = cause = new TimeoutException("Session idle timeout expired"); + break; + } + case REMOTELY_CLOSED: + { + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason); + sendGoAway = true; + goAwayFrame = goAwaySent; + closed = CloseState.CLOSING; + zeroStreamsAction = null; + failure = cause = new TimeoutException("Session idle timeout expired"); + break; + } + default: + { + if (LOG.isDebugEnabled()) + LOG.debug("Already closed, ignored idle timeout for {}", HTTP2Session.this); + return false; + } + } + } + + if (notify) + { + boolean confirmed = notifyIdleTimeout(HTTP2Session.this); + if (LOG.isDebugEnabled()) + LOG.debug("Idle timeout {} for {}", confirmed ? "confirmed" : "ignored", HTTP2Session.this); + if (confirmed) + halt(reason); + return false; + } + + failStreams(stream -> true, reason, true); + if (sendGoAway) + sendGoAway(goAwayFrame, Callback.NOOP); + notifyFailure(HTTP2Session.this, cause, Callback.NOOP); + terminate(goAwayFrame); + return false; + } + + private void onSessionFailure(int error, String reason, Callback callback) + { + GoAwayFrame goAwayFrame; + Throwable cause; + try (AutoLock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + case LOCALLY_CLOSED: + case REMOTELY_CLOSED: + { + // Send another GOAWAY with the error code. + goAwaySent = goAwayFrame = newGoAwayFrame(error, reason); + closed = CloseState.CLOSING; + zeroStreamsAction = null; + failure = cause = toFailure(error, reason); + break; + } + default: + { + if (LOG.isDebugEnabled()) + LOG.debug("Already closed, ignored session failure {}", HTTP2Session.this, failure); + callback.succeeded(); + return; + } + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("Session failure {}", HTTP2Session.this, cause); + + failStreams(stream -> true, reason, true); + sendGoAway(goAwayFrame, Callback.NOOP); + notifyFailure(HTTP2Session.this, cause, Callback.NOOP); + terminate(goAwayFrame); + } + + private void onWriteFailure(Throwable x) + { + String reason = "write_failure"; + try (AutoLock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + case LOCALLY_CLOSED: + case REMOTELY_CLOSED: + { + closed = CloseState.CLOSING; + zeroStreamsAction = () -> + { + GoAwayFrame goAwayFrame = newGoAwayFrame(0, ErrorCode.NO_ERROR.code, reason); + terminate(goAwayFrame); + }; + failure = x; + break; + } + default: + { + return; + } + } + } + abort(reason, x, Callback.from(this::tryRunZeroStreamsAction)); + } + + private void sendGoAwayAndTerminate(GoAwayFrame frame, GoAwayFrame eventFrame) + { + sendGoAway(frame, Callback.from(() -> terminate(eventFrame))); + } + + private void sendGoAway(GoAwayFrame frame, Callback callback) + { + control(null, callback, frame); + } + + private void onStreamCreated() + { + streamCount.incrementAndGet(); + } + + private void onStreamDestroyed() + { + long count = streamCount.decrementAndGet(); + // I've seen zero here, but it may increase again. + // That's why tryRunZeroStreamsAction() must check + // the count with the lock held. + if (count == 0) + tryRunZeroStreamsAction(); + } + + private void tryRunZeroStreamsAction() + { + // Threads from onStreamClosed() and other events + // such as onGoAway() may be in a race to finish, + // but only one moves to CLOSED and runs the action. + Runnable action = null; + CompletableFuture future; + try (AutoLock l = lock.lock()) + { + long count = streamCount.get(); + if (count > 0) + { + if (LOG.isDebugEnabled()) + LOG.debug("Deferred closing action, {} pending streams on {}", count, HTTP2Session.this); + return; + } + + future = shutdownCallback; + + switch (closed) + { + case LOCALLY_CLOSED: + { + if (goAwaySent.isGraceful()) + { + action = zeroStreamsAction; + zeroStreamsAction = null; + } + break; + } + case REMOTELY_CLOSED: + { + if (goAwaySent != null && goAwaySent.isGraceful()) + { + action = zeroStreamsAction; + zeroStreamsAction = null; + } + break; + } + case CLOSING: + { + closed = CloseState.CLOSED; + action = zeroStreamsAction; + zeroStreamsAction = null; + break; + } + default: + { + break; + } + } + } + if (action != null) + { + if (LOG.isDebugEnabled()) + LOG.debug("Executing zero streams action on {}", HTTP2Session.this); + action.run(); + } + if (future != null) + future.complete(null); + } + + private void terminate(GoAwayFrame frame) + { + if (LOG.isDebugEnabled()) + LOG.debug("Terminating {}", HTTP2Session.this); + HTTP2Session.this.terminate(failure); + notifyClose(HTTP2Session.this, frame, Callback.NOOP); + } private int priority(PriorityFrame frame, Callback callback) { Slot slot = new Slot(); int currentStreamId = frame.getStreamId(); - int streamId = reserveSlot(slot, currentStreamId); - - if (currentStreamId <= 0) - frame = frame.withStreamId(streamId); - - assignSlotAndFlush(slot, newEntry(frame, null, callback)); + int streamId = reserveSlot(slot, currentStreamId, callback::failed); + if (streamId > 0) + { + if (currentStreamId <= 0) + frame = frame.withStreamId(streamId); + slot.entries = List.of(newEntry(frame, null, Callback.from(callback::succeeded, x -> + { + HTTP2Session.this.onStreamDestroyed(streamId); + callback.failed(x); + }))); + flush(); + } return streamId; } - private void newStream(IStream.FrameList frameList, Promise promise, Stream.Listener listener) + private void newLocalStream(IStream.FrameList frameList, Promise promise, Stream.Listener listener) { Slot slot = new Slot(); int currentStreamId = frameList.getStreamId(); - int streamId = reserveSlot(slot, currentStreamId); - - List frames = frameList.getFrames(); - if (currentStreamId <= 0) + int streamId = reserveSlot(slot, currentStreamId, promise::failed); + if (streamId > 0) { - frames = frames.stream() - .map(frame -> frame.withStreamId(streamId)) - .collect(Collectors.toList()); + List frames = frameList.getFrames(); + if (currentStreamId <= 0) + { + frames = frames.stream() + .map(frame -> frame.withStreamId(streamId)) + .collect(Collectors.toList()); + } + if (createLocalStream(slot, frames, promise, listener, streamId)) + return; + freeSlot(slot, streamId); } + } - try + private void newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Promise promise) + { + int streamId; + try (AutoLock l = lock.lock()) + { + streamId = localStreamIds.getAndAdd(2); + HTTP2Session.this.onStreamCreated(streamId); + } + IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)frame.getMetaData(), new Promise<>() + { + @Override + public void failed(Throwable x) + { + HTTP2Session.this.onStreamDestroyed(streamId); + promise.failed(x); + } + }); + if (stream != null) { - HeadersFrame headersFrame = (HeadersFrame)frameList.getFrames().get(0); - IStream stream = HTTP2Session.this.createLocalStream(streamId, (MetaData.Request)headersFrame.getMetaData()); stream.setListener(listener); - stream.process(new PrefaceFrame(), Callback.NOOP); - - int count = frames.size(); - Callback streamCallback = new StreamPromiseCallback(promise, stream); - Callback callback = count == 1 ? streamCallback : new CountingCallback(streamCallback, count); - List entries = frames.stream() - .map(frame -> newEntry(frame, stream, callback)) - .collect(Collectors.toList()); - assignSlotAndFlush(slot, entries); + stream.updateClose(frame.isEndStream(), CloseState.Event.AFTER_SEND); } - catch (Throwable x) + } + + private boolean newRemoteStream(int streamId) + { + try (AutoLock l = lock.lock()) { - releaseSlotFlushAndFail(slot, promise, x); + switch (closed) + { + case NOT_CLOSED: + { + HTTP2Session.this.onStreamCreated(streamId); + return true; + } + case LOCALLY_CLOSED: + { + // SPEC: streams larger than GOAWAY's lastStreamId are dropped. + if (streamId <= goAwaySent.getLastStreamId()) + { + // Allow creation of streams that may have been in-flight. + HTTP2Session.this.onStreamCreated(streamId); + return true; + } + return false; + } + default: + return false; + } } } private void push(PushPromiseFrame frame, Promise promise, Stream.Listener listener) { Slot slot = new Slot(); - int streamId = reserveSlot(slot, 0); - frame = frame.withStreamId(streamId); - - try + int streamId = reserveSlot(slot, 0, promise::failed); + if (streamId > 0) { - IStream stream = HTTP2Session.this.createLocalStream(streamId, frame.getMetaData()); - stream.setListener(listener); - assignSlotAndFlush(slot, newEntry(frame, stream, new StreamPromiseCallback(promise, stream))); - } - catch (Throwable x) - { - releaseSlotFlushAndFail(slot, promise, x); + frame = frame.withStreamId(streamId); + if (createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId)) + return; + freeSlot(slot, streamId); } } - private int reserveSlot(Slot slot, int streamId) + private boolean createLocalStream(Slot slot, List frames, Promise promise, Stream.Listener listener, int streamId) { - if (streamId <= 0) + MetaData.Request request = extractMetaDataRequest(frames.get(0)); + if (request == null) + return false; + IStream stream = HTTP2Session.this.createLocalStream(streamId, request, promise); + if (stream == null) + return false; + + stream.setListener(listener); + stream.process(new PrefaceFrame(), Callback.NOOP); + + Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x -> { - try (AutoLock l = lock.lock()) - { - streamId = localStreamIds.getAndAdd(2); - slots.offer(slot); - } + HTTP2Session.this.onStreamDestroyed(streamId); + promise.failed(x); + }); + int count = frames.size(); + if (count == 1) + { + slot.entries = List.of(newEntry(frames.get(0), stream, streamCallback)); } else { - try (AutoLock l = lock.lock()) + Callback callback = new CountingCallback(streamCallback, count); + slot.entries = frames.stream() + .map(frame -> newEntry(frame, stream, callback)) + .collect(Collectors.toList()); + } + flush(); + return true; + } + + private MetaData.Request extractMetaDataRequest(StreamFrame frame) + { + if (frame instanceof HeadersFrame) + return (MetaData.Request)((HeadersFrame)frame).getMetaData(); + if (frame instanceof PushPromiseFrame) + return ((PushPromiseFrame)frame).getMetaData(); + return null; + } + + private int reserveSlot(Slot slot, int streamId, Consumer fail) + { + Throwable failure = null; + try (AutoLock l = lock.lock()) + { + if (closed == CloseState.NOT_CLOSED) { + if (streamId <= 0) + { + streamId = localStreamIds.getAndAdd(2); + HTTP2Session.this.onStreamCreated(streamId); + } slots.offer(slot); } + else + { + failure = this.failure; + if (failure == null) + failure = new IllegalStateException("session closed"); + } } - return streamId; + if (failure == null) + return streamId; + fail.accept(failure); + return 0; } - private void assignSlotAndFlush(Slot slot, HTTP2Flusher.Entry entry) - { - assignSlotAndFlush(slot, List.of(entry)); - } - - private void assignSlotAndFlush(Slot slot, List entries) - { - // Every time a slot entry is assigned, we must flush. - slot.entries = entries; - flush(); - } - - private void releaseSlotFlushAndFail(Slot slot, Promise promise, Throwable x) + private void freeSlot(Slot slot, int streamId) { try (AutoLock l = lock.lock()) { slots.remove(slot); } + HTTP2Session.this.onStreamDestroyed(streamId); flush(); - promise.failed(x); } /** @@ -1946,6 +2300,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio flusher.iterate(); } + @Override + public String toString() + { + try (AutoLock l = lock.lock()) + { + return String.format("state=[streams=%d,%s,goAwayRecv=%s,goAwaySent=%s,failure=%s]", + streamCount.get(), + closed, + goAwayRecv, + goAwaySent, + failure + ); + } + } + private class Slot { private volatile List entries; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index 7040c82407a..952e7f20afd 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -158,7 +158,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa localReset = true; failure = new EOFException("reset"); } - session.frames(this, List.of(frame), callback); + ((HTTP2Session)session).reset(this, frame, callback); } private boolean startWrite(Callback callback) @@ -367,24 +367,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa length = fields.getLongField(HttpHeader.CONTENT_LENGTH); dataLength = length >= 0 ? length : Long.MIN_VALUE; } - - if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) - session.removeStream(this); - callback.succeeded(); } private void onData(DataFrame frame, Callback callback) { - if (getRecvWindow() < 0) - { - // It's a bad client, it does not deserve to be - // treated gently by just resetting the stream. - ((HTTP2Session)session).onConnectionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded"); - callback.failed(new IOException("stream_window_exceeded")); - return; - } - // SPEC: remotely closed streams must be replied with a reset. if (isRemotelyClosed()) { @@ -483,9 +470,10 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa dataEntry = dataQueue.poll(); } DataFrame frame = dataEntry.frame; - if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED)) - session.removeStream(this); + boolean closed = updateClose(frame.isEndStream(), CloseState.Event.RECEIVED); notifyDataDemanded(this, frame, dataEntry.callback); + if (closed) + session.removeStream(this); } } @@ -505,8 +493,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa failure = new EofException("reset"); } close(); - session.removeStream(this); - notifyReset(this, frame, callback); + if (session.removeStream(this)) + notifyReset(this, frame, callback); } private void onPush(PushPromiseFrame frame, Callback callback) @@ -529,8 +517,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa failure = frame.getFailure(); } close(); - session.removeStream(this); - notifyFailure(this, frame, callback); + if (session.removeStream(this)) + notifyFailure(this, frame, callback); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java index 1fcc716021c..6b3b32c2ce2 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java @@ -45,8 +45,9 @@ public interface ISession extends Session *

Removes the given {@code stream}.

* * @param stream the stream to remove + * @return whether the stream was removed */ - public void removeStream(IStream stream); + public boolean removeStream(IStream stream); /** *

Sends the given list of frames to create a new {@link Stream}.

diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java index 19f11d7b2ec..b6ebfee783e 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java @@ -113,8 +113,6 @@ public interface Session /** *

Closes the session by sending a GOAWAY frame with the given error code * and payload.

- *

The GOAWAY frame is sent only once; subsequent or concurrent attempts to - * close the session will have no effect.

* * @param error the error code * @param payload an optional payload (may be null) @@ -225,6 +223,16 @@ public interface Session * * @param session the session * @param frame the GOAWAY frame received + */ + default void onGoAway(Session session, GoAwayFrame frame) + { + } + + /** + *

Callback method invoked when a GOAWAY frame caused the session to be closed.

+ * + * @param session the session + * @param frame the GOAWAY frame that caused the session to be closed * @param callback the callback to notify of the GOAWAY processing */ public default void onClose(Session session, GoAwayFrame frame, Callback callback) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/GoAwayFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/GoAwayFrame.java index 1b006eef4c3..609ee0c152a 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/GoAwayFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/GoAwayFrame.java @@ -20,30 +20,33 @@ package org.eclipse.jetty.http2.frames; import java.nio.charset.StandardCharsets; -import org.eclipse.jetty.http2.CloseState; import org.eclipse.jetty.http2.ErrorCode; public class GoAwayFrame extends Frame { - private final CloseState closeState; + public static final GoAwayFrame GRACEFUL = new GoAwayFrame(Integer.MAX_VALUE, ErrorCode.NO_ERROR.code, new byte[]{'g', 'r', 'a', 'c', 'e', 'f', 'u', 'l'}); + private final int lastStreamId; private final int error; private final byte[] payload; public GoAwayFrame(int lastStreamId, int error, byte[] payload) - { - this(CloseState.REMOTELY_CLOSED, lastStreamId, error, payload); - } - - public GoAwayFrame(CloseState closeState, int lastStreamId, int error, byte[] payload) { super(FrameType.GO_AWAY); - this.closeState = closeState; this.lastStreamId = lastStreamId; this.error = error; this.payload = payload; } + /** + * @return whether this GOAWAY frame is graceful, i.e. its {@code lastStreamId == Integer.MAX_VALUE} + */ + public boolean isGraceful() + { + // SPEC: section 6.8. + return lastStreamId == Integer.MAX_VALUE; + } + public int getLastStreamId() { return lastStreamId; @@ -76,11 +79,10 @@ public class GoAwayFrame extends Frame @Override public String toString() { - return String.format("%s,%d/%s/%s/%s", + return String.format("%s{%d/%s/%s}", super.toString(), lastStreamId, ErrorCode.toString(error, null), - tryConvertPayload(), - closeState); + tryConvertPayload()); } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 814105919e4..c469f592321 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -40,13 +40,13 @@ import org.eclipse.jetty.client.SendFailure; import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; -import org.eclipse.jetty.http2.CloseState; import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.HTTP2Session; -import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.thread.Sweeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,29 +99,43 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S public void upgrade(Map context) { - HttpResponse response = (HttpResponse)context.get(HttpResponse.class.getName()); - HttpRequest request = (HttpRequest)response.getRequest(); // In case of HTTP/1.1 upgrade to HTTP/2, the request is HTTP/1.1 // (with upgrade) for a resource, and the response is HTTP/2. - // Create the implicit stream#1 so that it can receive the HTTP/2 response. - MetaData.Request metaData = new MetaData.Request(request.getMethod(), HttpURI.from(request.getURI()), HttpVersion.HTTP_2, request.getHeaders()); - // We do not support upgrade requests with content, so endStream=true. - HeadersFrame frame = new HeadersFrame(metaData, null, true); - IStream stream = ((HTTP2Session)session).newLocalStream(frame, null); - stream.updateClose(frame.isEndStream(), CloseState.Event.AFTER_SEND); + + HttpResponse response = (HttpResponse)context.get(HttpResponse.class.getName()); + HttpRequest request = (HttpRequest)response.getRequest(); HttpExchange exchange = request.getConversation().getExchanges().peekLast(); HttpChannelOverHTTP2 http2Channel = acquireHttpChannel(); activeChannels.add(http2Channel); HttpExchange newExchange = new HttpExchange(exchange.getHttpDestination(), exchange.getRequest(), List.of()); http2Channel.associate(newExchange); - stream.setListener(http2Channel.getStreamListener()); - http2Channel.setStream(stream); - newExchange.requestComplete(null); - newExchange.terminateRequest(); - if (LOG.isDebugEnabled()) - LOG.debug("Upgrade completed for {}", this); + // Create the implicit stream#1 so that it can receive the HTTP/2 response. + MetaData.Request metaData = new MetaData.Request(request.getMethod(), HttpURI.from(request.getURI()), HttpVersion.HTTP_2, request.getHeaders()); + // We do not support upgrade requests with content, so endStream=true. + HeadersFrame frame = new HeadersFrame(metaData, null, true); + ((HTTP2Session)session).newUpgradeStream(frame, http2Channel.getStreamListener(), new Promise<>() + { + @Override + public void succeeded(Stream stream) + { + http2Channel.setStream(stream); + newExchange.requestComplete(null); + newExchange.terminateRequest(); + if (LOG.isDebugEnabled()) + LOG.debug("Upgrade succeeded for {}", HttpConnectionOverHTTP2.this); + } + + @Override + public void failed(Throwable failure) + { + newExchange.requestComplete(failure); + newExchange.terminateRequest(); + if (LOG.isDebugEnabled()) + LOG.debug("Upgrade failed for {}", HttpConnectionOverHTTP2.this); + } + }); } @Override diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java index 197e4e5ec20..fe3672c2846 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java @@ -35,6 +35,7 @@ public class AbstractTest { protected Server server; protected ServerConnector connector; + protected HTTP2Client http2Client; protected HttpClient client; protected void start(ServerSessionListener listener) throws Exception @@ -63,12 +64,13 @@ public class AbstractTest server.addConnector(connector); } - protected void prepareClient() throws Exception + protected void prepareClient() { - client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client())); + http2Client = new HTTP2Client(); + client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client)); QueuedThreadPool clientExecutor = new QueuedThreadPool(); clientExecutor.setName("client"); - client.setExecutor(clientExecutor); + this.client.setExecutor(clientExecutor); } @AfterEach diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java index 48077191c22..ff8124202cd 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java @@ -226,7 +226,9 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest MetaData.Request request = (MetaData.Request)frame.getMetaData(); if (HttpMethod.HEAD.is(request.getMethod())) { - stream.getSession().close(ErrorCode.REFUSED_STREAM_ERROR.code, null, Callback.NOOP); + int error = ErrorCode.REFUSED_STREAM_ERROR.code; + stream.reset(new ResetFrame(stream.getId(), error), Callback.NOOP); + stream.getSession().close(error, null, Callback.NOOP); } else { diff --git a/jetty-http2/http2-http-client-transport/src/test/resources/jetty-logging.properties b/jetty-http2/http2-http-client-transport/src/test/resources/jetty-logging.properties index 1cec8b3203e..b597ee2c29c 100644 --- a/jetty-http2/http2-http-client-transport/src/test/resources/jetty-logging.properties +++ b/jetty-http2/http2-http-client-transport/src/test/resources/jetty-logging.properties @@ -1,6 +1,6 @@ # Jetty Logging using jetty-slf4j-impl #org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.client.LEVEL=DEBUG -org.eclipse.jetty.http2.hpack.LEVEL=INFO #org.eclipse.jetty.http2.LEVEL=DEBUG +org.eclipse.jetty.http2.hpack.LEVEL=INFO #org.eclipse.jetty.io.ssl.LEVEL=DEBUG diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java index 4e23fd014f8..fc4f57b0db9 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerSession.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.CloseState; import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.HTTP2Session; @@ -65,18 +66,12 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis settings = Collections.emptyMap(); SettingsFrame settingsFrame = new SettingsFrame(settings, false); - WindowUpdateFrame windowFrame = null; int sessionWindow = getInitialSessionRecvWindow() - FlowControlStrategy.DEFAULT_WINDOW_SIZE; + updateRecvWindow(sessionWindow); if (sessionWindow > 0) - { - updateRecvWindow(sessionWindow); - windowFrame = new WindowUpdateFrame(0, sessionWindow); - } - - if (windowFrame == null) - frames(null, List.of(settingsFrame), Callback.NOOP); + frames(null, List.of(settingsFrame, new WindowUpdateFrame(0, sessionWindow)), Callback.NOOP); else - frames(null, List.of(settingsFrame, windowFrame), Callback.NOOP); + frames(null, List.of(settingsFrame), Callback.NOOP); } @Override @@ -105,31 +100,26 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis } else { - if (isClosed()) + stream = createRemoteStream(streamId, (MetaData.Request)metaData); + if (stream != null) { - updateLastRemoteStreamId(streamId); - reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP); - } - else - { - stream = createRemoteStream(streamId, (MetaData.Request)metaData); - if (stream != null) + onStreamOpened(stream); + + if (metaData instanceof MetaData.ConnectRequest) { - onStreamOpened(stream); - - if (metaData instanceof MetaData.ConnectRequest) + if (!isConnectProtocolEnabled() && ((MetaData.ConnectRequest)metaData).getProtocol() != null) { - if (!isConnectProtocolEnabled() && ((MetaData.ConnectRequest)metaData).getProtocol() != null) - { - stream.reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR.code), Callback.NOOP); - return; - } + stream.reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR.code), Callback.NOOP); + return; } - - stream.process(frame, Callback.NOOP); - Stream.Listener listener = notifyNewStream(stream, frame); - stream.setListener(listener); } + + stream.process(frame, Callback.NOOP); + boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED); + Stream.Listener listener = notifyNewStream(stream, frame); + stream.setListener(listener); + if (closed) + removeStream(stream); } } } @@ -148,7 +138,10 @@ public class HTTP2ServerSession extends HTTP2Session implements ServerParser.Lis if (stream != null) { stream.process(frame, Callback.NOOP); + boolean closed = stream.updateClose(frame.isEndStream(), CloseState.Event.RECEIVED); notifyHeaders(stream, frame); + if (closed) + removeStream(stream); } else { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java index c4586d17ab9..0eac1c7409a 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java @@ -109,6 +109,12 @@ public class RawHTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnecti delegate.onReset(session, frame); } + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + delegate.onGoAway(session, frame); + } + @Override public void onClose(Session session, GoAwayFrame frame) { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index 2811c4c4aad..958d306d780 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -492,6 +492,6 @@ public abstract class IteratingCallback implements Callback @Override public String toString() { - return String.format("%s[%s]", super.toString(), _state); + return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), _state); } }