From 226d616a8a257a711d354b229149207c6f655d2f Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 10 Nov 2020 10:07:19 +0100 Subject: [PATCH] Issue 5310 - Review HTTP/2 GOAWAY handling. Reimplemented close/idle_timeout/stop/onGoAway/input_shutdown following more closely the specification. In particular, the semantic of sending a GOAWAY is now to: * stop creation of new both local and remote streams * record the last processed stream * continue processing streams that are pending This means that a GOAWAY is "graceful" in the sense that it allows for streams to be completed by applications. The semantic of stop() and idle timeout is harsher: for pending streams a RST_STREAM is sent to the other peer and they are failed locally. Added support for GOAWAY with 2^31-1 lastStreamId. Added support for a peer to send and receive multiple GOAWAY frames. Reviewed the stream creation/destruction mechanism so that when the last stream completes after a GOAWAY, proper actions can be run to tear down the connection. Signed-off-by: Simone Bordet --- .../client/HTTP2ClientConnectionFactory.java | 14 - .../jetty/http2/client/AsyncServletTest.java | 9 +- .../http2/client/FlowControlStrategyTest.java | 56 +- .../jetty/http2/client/GoAwayTest.java | 963 +++++++++++++ .../http2/client/SessionFailureTest.java | 4 +- .../jetty/http2/client/StreamCloseTest.java | 4 +- .../eclipse/jetty/http2/HTTP2Connection.java | 2 +- .../org/eclipse/jetty/http2/HTTP2Flusher.java | 4 +- .../org/eclipse/jetty/http2/HTTP2Session.java | 1266 +++++++++++------ .../org/eclipse/jetty/http2/HTTP2Stream.java | 18 +- .../org/eclipse/jetty/http2/ISession.java | 3 +- .../org/eclipse/jetty/http2/api/Session.java | 12 +- .../jetty/http2/frames/GoAwayFrame.java | 24 +- .../jetty/http2/client/http/AbstractTest.java | 8 +- .../HttpClientTransportOverHTTP2Test.java | 4 +- .../RawHTTP2ServerConnectionFactory.java | 6 + .../eclipse/jetty/util/IteratingCallback.java | 2 +- 17 files changed, 1882 insertions(+), 517 deletions(-) create mode 100644 jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 870788cb399..388599be9b1 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -92,20 +92,6 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory this.listener = listener; } - @Override - public long getMessagesIn() - { - HTTP2ClientSession session = (HTTP2ClientSession)getSession(); - return session.getStreamsOpened(); - } - - @Override - public long getMessagesOut() - { - HTTP2ClientSession session = (HTTP2ClientSession)getSession(); - return session.getStreamsClosed(); - } - @Override public void onOpen() { diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java index 7c971a452d1..140b2862a54 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncServletTest.java @@ -136,7 +136,7 @@ public class AsyncServletTest extends AbstractTest HeadersFrame frame = new HeadersFrame(metaData, null, true); FuturePromise promise = new FuturePromise<>(); CountDownLatch responseLatch = new CountDownLatch(1); - CountDownLatch resetLatch = new CountDownLatch(1); + CountDownLatch failLatch = new CountDownLatch(1); session.newStream(frame, promise, new Stream.Listener.Adapter() { @Override @@ -146,9 +146,10 @@ public class AsyncServletTest extends AbstractTest } @Override - public void onReset(Stream stream, ResetFrame frame) + public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) { - resetLatch.countDown(); + failLatch.countDown(); + callback.succeeded(); } }); Stream stream = promise.get(5, TimeUnit.SECONDS); @@ -156,7 +157,7 @@ public class AsyncServletTest extends AbstractTest assertTrue(serverLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); assertFalse(responseLatch.await(idleTimeout + 1000, TimeUnit.MILLISECONDS)); - assertTrue(resetLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + assertTrue(failLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); } @Test diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java index 5d7a156e11d..3100f66078f 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStrategyTest.java @@ -731,6 +731,7 @@ public abstract class FlowControlStrategyTest public void testClientExceedingSessionWindow() throws Exception { // On server, we don't consume the data. + CountDownLatch serverCloseLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() { @Override @@ -745,16 +746,29 @@ public abstract class FlowControlStrategyTest } }; } - }); - CountDownLatch closeLatch = new CountDownLatch(1); - Session session = newClient(new Session.Listener.Adapter() - { @Override public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session session = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) { if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code) - closeLatch.countDown(); + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); } }); @@ -800,13 +814,16 @@ public abstract class FlowControlStrategyTest http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0])); // Expect the connection to be closed. - assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); } @Test public void testClientExceedingStreamWindow() throws Exception { // On server, we don't consume the data. + CountDownLatch serverCloseLatch = new CountDownLatch(1); start(new ServerSessionListener.Adapter() { @Override @@ -829,16 +846,29 @@ public abstract class FlowControlStrategyTest } }; } - }); - CountDownLatch closeLatch = new CountDownLatch(1); - Session session = newClient(new Session.Listener.Adapter() - { @Override public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session session = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) { if (frame.getError() == ErrorCode.FLOW_CONTROL_ERROR.code) - closeLatch.countDown(); + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); } }); @@ -880,7 +910,9 @@ public abstract class FlowControlStrategyTest http2Session.getEndPoint().write(Callback.NOOP, buffers.toArray(new ByteBuffer[0])); // Expect the connection to be closed. - assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); } @Test diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java new file mode 100644 index 00000000000..a2c2345d6e2 --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/GoAwayTest.java @@ -0,0 +1,963 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.CloseState; +import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.HTTP2Session; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.GoAwayFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.component.LifeCycle; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class GoAwayTest extends AbstractTest +{ + @Test + public void testClientGoAwayServerReplies() throws Exception + { + CountDownLatch serverLatch = new CountDownLatch(1); + AtomicReference serverSessionRef = new AtomicReference<>(); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverSessionRef.set(stream.getSession()); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + return null; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverLatch.countDown(); + } + }); + + CountDownLatch clientLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientLatch.countDown(); + } + }); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientSession.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); + } + }); + + Assertions.assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertSame(CloseState.CLOSED, ((HTTP2Session)serverSessionRef.get()).getCloseState()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + Assertions.assertSame(CloseState.CLOSED, ((HTTP2Session)clientSession).getCloseState()); + } + + @Test + public void testServerGoAwayWithInFlightStreamClientFailsStream() throws Exception + { + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverSessionRef.set(stream.getSession()); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + return null; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + + MetaData.Request request1 = newRequest(HttpMethod.GET.asString(), new HttpFields()); + CountDownLatch streamFailureLatch = new CountDownLatch(1); + clientSession.newStream(new HeadersFrame(request1, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + // Simulate the server closing while the client sends a second request. + // The server sends a lastStreamId for the first request, and discards the second. + serverSessionRef.get().close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); + // The client sends the second request and should eventually fail it + // locally since it has a larger streamId, and the server discarded it. + MetaData.Request request2 = newRequest(HttpMethod.GET.asString(), new HttpFields()); + clientSession.newStream(new HeadersFrame(request2, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) + { + streamFailureLatch.countDown(); + callback.succeeded(); + } + }); + } + }); + + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); +// Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + } + + @Test + public void testServerGracefulGoAway() throws Exception + { + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + AtomicReference serverSessionRef = new AtomicReference<>(); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverSessionRef.set(stream.getSession()); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + return null; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (!frame.isGraceful()) + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + clientGracefulGoAwayLatch.countDown(); + else + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (!frame.isGraceful()) + clientCloseLatch.countDown(); + } + }); + CountDownLatch clientLatch = new CountDownLatch(1); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + } + }); + + Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + + // Send a graceful GOAWAY from the server. + // Because the server had no pending streams, it will send also a non-graceful GOAWAY. + ((HTTP2Session)serverSessionRef.get()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testServerGracefulGoAwayWithStreamsServerClosesWhenLastStreamCloses() throws Exception + { + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + AtomicReference serverSessionRef = new AtomicReference<>(); + AtomicReference serverStreamRef = new AtomicReference<>(); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreamRef.set(stream); + Session session = stream.getSession(); + serverSessionRef.set(session); + + // Send a graceful GOAWAY while processing a stream. + ((HTTP2Session)session).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + return null; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (!frame.isGraceful()) + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + clientGracefulGoAwayLatch.countDown(); + else + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + if (!frame.isGraceful()) + clientCloseLatch.countDown(); + } + }); + CountDownLatch clientLatch = new CountDownLatch(1); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + } + }); + + // Wait for the graceful GOAWAY. + Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + + // Now the client cannot create new streams. + FuturePromise streamPromise = new FuturePromise<>(); + clientSession.newStream(new HeadersFrame(newRequest(HttpMethod.GET.asString(), new HttpFields()), null, true), streamPromise, null); + Assertions.assertThrows(ExecutionException.class, () -> streamPromise.get(5, TimeUnit.SECONDS)); + + // The client must not reply to a graceful GOAWAY. + Assertions.assertFalse(serverGoAwayLatch.await(1, TimeUnit.SECONDS)); + + // Previous streams must complete successfully. + Stream serverStream = serverStreamRef.get(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP); + + Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + + // The server should have sent the GOAWAY after the last stream completed. + + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testClientGoAwayWithStreamsServerClosesWhenLastStreamCloses() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + CountDownLatch serverStreamLatch = new CountDownLatch(1); + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreamRef.set(stream); + serverStreamLatch.countDown(); + return null; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + + CountDownLatch clientLatch = new CountDownLatch(1); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + } + }); + + Assertions.assertTrue(serverStreamLatch.await(5, TimeUnit.SECONDS)); + + // The client sends a GOAWAY. + clientSession.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); + + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + + // The client must not receive a GOAWAY until the all streams are completed. + Assertions.assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS)); + + // Complete the stream. + Stream serverStream = serverStreamRef.get(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP); + + Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverStream.getSession()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testServerGracefulGoAwayWithStreamsClientGoAwayServerClosesWhenLastStreamCloses() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + CountDownLatch serverStreamLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreamRef.set(stream); + serverStreamLatch.countDown(); + + // Send a graceful GOAWAY while processing a stream. + ((HTTP2Session)stream.getSession()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + return null; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + { + // Send a GOAWAY when receiving a graceful GOAWAY. + session.close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); + } + else + { + clientGoAwayLatch.countDown(); + } + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + + CountDownLatch clientLatch = new CountDownLatch(1); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); + clientSession.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + if (frame.isEndStream() && response.getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + } + }); + + // The server has a pending stream, so it does not send the non-graceful GOAWAY yet. + Assertions.assertFalse(clientGoAwayLatch.await(1, TimeUnit.SECONDS)); + + // Complete the stream, the server should send the non-graceful GOAWAY. + Stream serverStream = serverStreamRef.get(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + serverStream.headers(new HeadersFrame(serverStream.getId(), response, null, true), Callback.NOOP); + + // The server already received the client GOAWAY, + // so completing the last stream produces a close event. + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + // The client should receive the server non-graceful GOAWAY. + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverStream.getSession()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testClientGracefulGoAwayWithStreamsServerGracefulGoAwayServerClosesWhenLastStreamCloses() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreamRef.set(stream); + return new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + if (frame.isEndStream()) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), callback); + } + else + { + callback.succeeded(); + } + } + }; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + { + // Send a graceful GOAWAY. + ((HTTP2Session)session).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + } + else + { + serverGoAwayLatch.countDown(); + } + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + clientGracefulGoAwayLatch.countDown(); + else + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); + FuturePromise promise = new FuturePromise<>(); + clientSession.newStream(new HeadersFrame(request, null, false), promise, new Stream.Listener.Adapter()); + Stream clientStream = promise.get(5, TimeUnit.SECONDS); + + // Send a graceful GOAWAY from the client. + ((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + // The server should send a graceful GOAWAY. + Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + + // Complete the stream. + clientStream.data(new DataFrame(clientStream.getId(), BufferUtil.EMPTY_BUFFER, true), Callback.NOOP); + + // Both client and server should send a non-graceful GOAWAY. + Assertions.assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverStreamRef.get().getSession()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testClientShutdownServerCloses() throws Exception + { + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverSessionRef.set(session); + serverCloseLatch.countDown(); + } + }); + + Session clientSession = newClient(new Session.Listener.Adapter()); + // Wait for the SETTINGS frames to be exchanged. + Thread.sleep(500); + + ((HTTP2Session)clientSession).getEndPoint().close(); + + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + } + + @Test + public void testServerGracefulGoAwayClientShutdownServerCloses() throws Exception + { + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + // Reply to the graceful GOAWAY from the server with a TCP close. + ((HTTP2Session)session).getEndPoint().close(); + } + }); + // Wait for the SETTINGS frames to be exchanged. + Thread.sleep(500); + + // Send a graceful GOAWAY to the client. + ((HTTP2Session)serverSessionRef.get()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + } + + @Test + public void testServerIdleTimeout() throws Exception + { + long idleTimeout = 1000; + + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + ((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout); + } + + @Override + public boolean onIdleTimeout(Session session) + { + serverIdleTimeoutLatch.countDown(); + return true; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + + Assertions.assertTrue(serverIdleTimeoutLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + // Server should send a GOAWAY to the client. + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + // The client replied to server's GOAWAY, but the server already closed. + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testServerGracefulGoAwayWithStreamsServerIdleTimeout() throws Exception + { + long idleTimeout = 1000; + + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + ((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout); + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + stream.setIdleTimeout(10 * idleTimeout); + // Send a graceful GOAWAY. + ((HTTP2Session)stream.getSession()).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + return null; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + clientGracefulGoAwayLatch.countDown(); + else + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); + // Send request headers but not data. + clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter()); + + Assertions.assertTrue(clientGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + // Server idle timeout sends a non-graceful GOAWAY. + Assertions.assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testClientGracefulGoAwayWithStreamsServerIdleTimeout() throws Exception + { + long idleTimeout = 1000; + + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverGracefulGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + ((HTTP2Session)session).getEndPoint().setIdleTimeout(idleTimeout); + } + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + stream.setIdleTimeout(10 * idleTimeout); + return null; + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + if (frame.isGraceful()) + serverGracefulGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); + CountDownLatch streamFailureLatch = new CountDownLatch(1); + clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) + { + streamFailureLatch.countDown(); + callback.succeeded(); + } + }); + + // Client sends a graceful GOAWAY. + ((HTTP2Session)clientSession).goAway(GoAwayFrame.GRACEFUL, Callback.NOOP); + + Assertions.assertTrue(serverGracefulGoAwayLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientGoAwayLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + Assertions.assertTrue(streamFailureLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } + + @Test + public void testServerGoAwayWithStreamsThenStop() throws Exception + { + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverCloseLatch = new CountDownLatch(1); + start(new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverSessionRef.set(stream.getSession()); + // Don't reply, don't reset the stream, just send the GOAWAY. + stream.getSession().close(ErrorCode.NO_ERROR.code, "close", Callback.NOOP); + return null; + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + serverCloseLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientCloseLatch = new CountDownLatch(1); + Session clientSession = newClient(new Session.Listener.Adapter() + { + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onClose(Session session, GoAwayFrame frame) + { + clientCloseLatch.countDown(); + } + }); + + MetaData.Request request = newRequest(HttpMethod.GET.asString(), new HttpFields()); + CountDownLatch clientResetLatch = new CountDownLatch(1); + clientSession.newStream(new HeadersFrame(request, null, false), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onReset(Stream stream, ResetFrame frame) + { + clientResetLatch.countDown(); + } + }); + + Assertions.assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + + // Neither the client nor the server are finishing + // the pending stream, so force the stop on the server. + LifeCycle.stop(serverSessionRef.get()); + + // The server should reset all the pending streams. + Assertions.assertTrue(clientResetLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(serverCloseLatch.await(5, TimeUnit.SECONDS)); + Assertions.assertTrue(clientCloseLatch.await(5, TimeUnit.SECONDS)); + + Assertions.assertFalse(((HTTP2Session)serverSessionRef.get()).getEndPoint().isOpen()); + Assertions.assertFalse(((HTTP2Session)clientSession).getEndPoint().isOpen()); + } +} diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java index 5094e5846af..bf3e886ff39 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/SessionFailureTest.java @@ -84,8 +84,8 @@ public class SessionFailureTest extends AbstractTest @Override public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) { - // Forcibly close the connection. - ((HTTP2Session)stream.getSession()).getEndPoint().close(); + // Forcibly shutdown the output to fail the write below. + ((HTTP2Session)stream.getSession()).getEndPoint().shutdownOutput(); // Now try to write something: it should fail. stream.headers(frame, new Callback() { diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java index c2ecfc78bd2..a955cbcda54 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/StreamCloseTest.java @@ -321,7 +321,9 @@ public class StreamCloseTest extends AbstractTest MetaData.Request request = (MetaData.Request)frame.getMetaData(); if ("GET".equals(request.getMethod())) { - ((HTTP2Session)stream.getSession()).getEndPoint().close(); + // Only shutdown the output, since closing the EndPoint causes a call to + // stop() on different thread which tries to concurrently fail the stream. + ((HTTP2Session)stream.getSession()).getEndPoint().shutdownOutput(); // Try to write something to force an error. stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true), Callback.NOOP); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 8d3adb4e647..c37ffa0d222 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -239,7 +239,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher. { Runnable task = pollTask(); if (LOG.isDebugEnabled()) - LOG.debug("Dequeued task {}", task); + LOG.debug("Dequeued task {}", String.valueOf(task)); if (task != null) return task; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index b228e4a340a..26efd12415b 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -365,7 +365,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable // If the failure came from within the // flusher, we need to close the connection. if (closed == null) - session.abort(x); + session.onWriteFailure(x); } void terminate(Throwable cause) @@ -376,7 +376,7 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable closed = terminated; terminated = cause; if (LOG.isDebugEnabled()) - LOG.debug("{}", closed != null ? "Terminated" : "Terminating"); + LOG.debug("{} {}", closed != null ? "Terminated" : "Terminating", this); } if (closed == null) iterate(); diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 6c038817946..01f9eaefe85 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -23,7 +23,6 @@ import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -35,13 +34,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; -import org.eclipse.jetty.http2.frames.DisconnectFrame; import org.eclipse.jetty.http2.frames.FailureFrame; import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.FrameType; @@ -72,6 +71,7 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Locker; import org.eclipse.jetty.util.thread.Scheduler; @ManagedObject @@ -82,14 +82,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private final ConcurrentMap streams = new ConcurrentHashMap<>(); private final AtomicLong streamsOpened = new AtomicLong(); private final AtomicLong streamsClosed = new AtomicLong(); - private final StreamCreator streamCreator = new StreamCreator(); + private final StreamsState streamsState = new StreamsState(); private final AtomicInteger localStreamIds = new AtomicInteger(); private final AtomicInteger lastRemoteStreamId = new AtomicInteger(); private final AtomicInteger localStreamCount = new AtomicInteger(); private final AtomicBiInteger remoteStreamCount = new AtomicBiInteger(); private final AtomicInteger sendWindow = new AtomicInteger(); private final AtomicInteger recvWindow = new AtomicInteger(); - private final AtomicReference closed = new AtomicReference<>(CloseState.NOT_CLOSED); private final AtomicLong bytesWritten = new AtomicLong(); private final Scheduler scheduler; private final EndPoint endPoint; @@ -103,8 +102,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private int initialSessionRecvWindow; private int writeThreshold; private boolean pushEnabled; - private long idleTime; - private GoAwayFrame closeFrame; public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId) { @@ -117,13 +114,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio this.maxLocalStreams = -1; this.maxRemoteStreams = -1; this.localStreamIds.set(initialStreamId); - this.lastRemoteStreamId.set(isClientStream(initialStreamId) ? 0 : -1); this.streamIdleTimeout = endPoint.getIdleTimeout(); this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.writeThreshold = 32 * 1024; this.pushEnabled = true; // SPEC: by default, push is enabled. - this.idleTime = System.nanoTime(); addBean(flowControl); addBean(flusher); } @@ -132,26 +127,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio protected void doStop() throws Exception { super.doStop(); - close(ErrorCode.NO_ERROR.code, "stop", new Callback() - { - @Override - public void succeeded() - { - disconnect(); - } - - @Override - public void failed(Throwable x) - { - disconnect(); - } - - @Override - public InvocationType getInvocationType() - { - return InvocationType.NON_BLOCKING; - } - }); + streamsState.halt("stop"); } @ManagedAttribute(value = "The flow control strategy", readonly = true) @@ -250,7 +226,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } @Override - public void onData(final DataFrame frame, Callback callback) + public void onData(DataFrame frame, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug("Received {} on {}", frame, this); @@ -266,9 +242,22 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio if (stream != null) { if (getRecvWindow() < 0) - onConnectionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", callback); + { + onSessionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "session_window_exceeded", callback); + } else - stream.process(frame, new DataCallback(callback, stream, flowControlLength)); + { + if (stream.updateRecvWindow(0) < 0) + { + // It's a bad client, it does not deserve to be + // treated gently by just resetting the stream. + onSessionFailure(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", callback); + } + else + { + stream.process(frame, new DataCallback(callback, stream, flowControlLength)); + } + } } else { @@ -280,7 +269,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio if (isStreamClosed(streamId)) reset(null, new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), callback); else - onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_data_frame", callback); + onSessionFailure(ErrorCode.PROTOCOL_ERROR.code, "unexpected_data_frame", callback); } } @@ -455,38 +444,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio * @see #close(int, String, Callback) * @see #onShutdown() * @see #onIdleTimeout() + * TODO: Review javadocs */ @Override - public void onGoAway(final GoAwayFrame frame) + public void onGoAway(GoAwayFrame frame) { if (LOG.isDebugEnabled()) LOG.debug("Received {} on {}", frame, this); - - while (true) - { - CloseState current = closed.get(); - switch (current) - { - case NOT_CLOSED: - { - if (closed.compareAndSet(current, CloseState.REMOTELY_CLOSED)) - { - // We received a GO_AWAY, so try to write - // what's in the queue and then disconnect. - closeFrame = frame; - onClose(frame, new DisconnectCallback()); - return; - } - break; - } - default: - { - if (LOG.isDebugEnabled()) - LOG.debug("Ignored {}, already closed", frame); - return; - } - } - } + streamsState.onGoAway(frame); } @Override @@ -532,18 +497,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void onStreamFailure(int streamId, int error, String reason) { - Callback callback = new ResetCallback(streamId, error, Callback.NOOP); + Callback callback = Callback.from(() -> reset(getStream(streamId), new ResetFrame(streamId, error), Callback.NOOP)); Throwable failure = toFailure(error, reason); if (LOG.isDebugEnabled()) LOG.debug("Stream #{} failure {}", streamId, this, failure); - onStreamFailure(streamId, error, reason, failure, callback); - } - - private void onStreamFailure(int streamId, int error, String reason, Throwable failure, Callback callback) - { IStream stream = getStream(streamId); if (stream != null) - stream.process(new FailureFrame(error, reason, failure), callback); + failStream(stream, error, reason, failure, callback); else callback.succeeded(); } @@ -551,22 +511,24 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void onConnectionFailure(int error, String reason) { - onConnectionFailure(error, reason, Callback.NOOP); + onSessionFailure(error, reason, Callback.NOOP); } - protected void onConnectionFailure(int error, String reason, Callback callback) + private void onSessionFailure(int error, String reason, Callback callback) { - Throwable failure = toFailure(error, reason); - if (LOG.isDebugEnabled()) - LOG.debug("Session failure {}", this, failure); - onFailure(error, reason, failure, new CloseCallback(error, reason, callback)); + streamsState.onSessionFailure(error, reason, callback); } - protected void abort(Throwable failure) + void onWriteFailure(Throwable failure) + { + streamsState.onWriteFailure(failure); + } + + protected void abort(String reason, Throwable failure, Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("Session abort {}", this, failure); - onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure)); + LOG.debug("Session abort {} for {}", reason, this, failure); + onFailure(ErrorCode.NO_ERROR.code, reason, failure, callback); } private void onFailure(int error, String reason, Throwable failure, Callback callback) @@ -576,26 +538,35 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio Callback countCallback = new CountingCallback(callback, count + 1); for (Stream stream : streams) { - onStreamFailure(stream.getId(), error, reason, failure, countCallback); + if (stream.isClosed()) + countCallback.succeeded(); + else + failStream(stream, error, reason, failure, countCallback); } notifyFailure(this, failure, countCallback); } - private void onClose(GoAwayFrame frame, Callback callback) + private void failStreams(Predicate matcher, String reason, boolean reset) { - int error = frame.getError(); - String reason = frame.tryConvertPayload(); + int error = ErrorCode.CANCEL_STREAM_ERROR.code; Throwable failure = toFailure(error, reason); - if (LOG.isDebugEnabled()) - LOG.debug("Session close {}", this, failure); - Collection streams = getStreams(); - int count = streams.size(); - Callback countCallback = new CountingCallback(callback, count + 1); - for (Stream stream : streams) + for (Stream stream : getStreams()) { - onStreamFailure(stream.getId(), error, reason, failure, countCallback); + if (stream.isClosed()) + continue; + if (!matcher.test((IStream)stream)) + continue; + if (LOG.isDebugEnabled()) + LOG.debug("Failing stream {} of {}", stream, this); + failStream(stream, error, reason, failure, Callback.NOOP); + if (reset) + stream.reset(new ResetFrame(stream.getId(), error), Callback.NOOP); } - notifyClose(this, frame, countCallback); + } + + private void failStream(Stream stream, int error, String reason, Throwable failure, Callback callback) + { + ((IStream)stream).process(new FailureFrame(error, reason, failure), callback); } private Throwable toFailure(int error, String reason) @@ -612,19 +583,19 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void newStream(IStream.FrameList frames, Promise promise, Stream.Listener listener) { - streamCreator.newStream(frames, promise, listener); + streamsState.newLocalStream(frames, promise, listener); } @Override public int priority(PriorityFrame frame, Callback callback) { - return streamCreator.priority(frame, callback); + return streamsState.priority(frame, callback); } @Override public void push(IStream stream, Promise promise, PushPromiseFrame frame, Stream.Listener listener) { - streamCreator.push(frame, new Promise.Wrapper(promise) + streamsState.push(frame, new Promise.Wrapper(promise) { @Override public void succeeded(Stream pushedStream) @@ -686,37 +657,22 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio * @see #onGoAway(GoAwayFrame) * @see #onShutdown() * @see #onIdleTimeout() + * // TODO: review javadocs */ @Override public boolean close(int error, String reason, Callback callback) { - while (true) - { - CloseState current = closed.get(); - switch (current) - { - case NOT_CLOSED: - { - if (closed.compareAndSet(current, CloseState.LOCALLY_CLOSED)) - { - closeFrame = newGoAwayFrame(CloseState.LOCALLY_CLOSED, error, reason); - control(null, callback, closeFrame); - return true; - } - break; - } - default: - { - if (LOG.isDebugEnabled()) - LOG.debug("Ignoring close {}/{}, already closed", error, reason); - callback.succeeded(); - return false; - } - } - } + if (LOG.isDebugEnabled()) + LOG.debug("Closing {}/{} {}", ErrorCode.toString(error, null), reason, this); + return goAway(newGoAwayFrame(error, reason), callback); } - private GoAwayFrame newGoAwayFrame(CloseState closeState, int error, String reason) + public boolean goAway(GoAwayFrame frame, Callback callback) + { + return streamsState.goAway(frame, callback); + } + + private GoAwayFrame newGoAwayFrame(int error, String reason) { byte[] payload = null; if (reason != null) @@ -725,13 +681,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio reason = reason.substring(0, Math.min(reason.length(), 32)); payload = reason.getBytes(StandardCharsets.UTF_8); } - return new GoAwayFrame(closeState, getLastRemoteStreamId(), error, payload); + return new GoAwayFrame(getLastRemoteStreamId(), error, payload); } @Override public boolean isClosed() { - return closed.get() != CloseState.NOT_CLOSED; + return getCloseState() != CloseState.NOT_CLOSED; + } + + public CloseState getCloseState() + { + return streamsState.getCloseState(); } private void control(IStream stream, Callback callback, Frame frame) @@ -786,15 +747,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - protected IStream createLocalStream(int streamId) + protected IStream createLocalStream(int streamId, Promise promise) { while (true) { int localCount = localStreamCount.get(); int maxCount = getMaxLocalStreams(); if (maxCount >= 0 && localCount >= maxCount) - // TODO: remove the dump() in the exception message. - throw new IllegalStateException("Max local stream count " + maxCount + " exceeded" + System.lineSeparator() + dump()); + { + promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded")); + return null; + } if (localStreamCount.compareAndSet(localCount, localCount + 1)) break; } @@ -811,12 +774,23 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio else { localStreamCount.decrementAndGet(); - throw new IllegalStateException("Duplicate stream " + streamId); + promise.failed(new IllegalStateException("Duplicate stream " + streamId)); + return null; } } protected IStream createRemoteStream(int streamId) { + if (!streamsState.newRemoteStream(streamId)) + { + if (LOG.isDebugEnabled()) + LOG.debug("Could not create remote stream #{} for {}", streamId, this); + return null; + } + + // This stream has been seen the server. + updateLastRemoteStreamId(streamId); + // SPEC: exceeding max concurrent streams is treated as stream error. while (true) { @@ -826,8 +800,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio int maxCount = getMaxRemoteStreams(); if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) { - updateLastRemoteStreamId(streamId); - reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP); + reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId))); return null; } if (remoteStreamCount.compareAndSet(encoded, remoteCount + 1, remoteClosing)) @@ -835,11 +808,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } IStream stream = newStream(streamId, false); - - // SPEC: duplicate stream is treated as connection error. if (streams.putIfAbsent(streamId, stream) == null) { - updateLastRemoteStreamId(streamId); stream.setIdleTimeout(getStreamIdleTimeout()); flowControl.onStreamCreated(stream); if (LOG.isDebugEnabled()) @@ -849,6 +819,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio else { remoteStreamCount.addAndGetHi(-1); + onStreamDestroyed(streamId); + // SPEC: duplicate stream is treated as connection error. onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream"); return null; } @@ -868,16 +840,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } @Override - public void removeStream(IStream stream) + public boolean removeStream(IStream stream) { - IStream removed = streams.remove(stream.getId()); - if (removed != null) - { - onStreamClosed(stream); - flowControl.onStreamDestroyed(stream); - if (LOG.isDebugEnabled()) - LOG.debug("Removed {} {} from {}", stream.isLocal() ? "local" : "remote", stream, this); - } + int streamId = stream.getId(); + IStream removed = streams.remove(streamId); + if (removed == null) + return false; + if (LOG.isDebugEnabled()) + LOG.debug("Removed {} {} from {}", stream.isLocal() ? "local" : "remote", stream, this); + onStreamClosed(stream); + flowControl.onStreamDestroyed(stream); + onStreamDestroyed(streamId); + return true; } @Override @@ -889,7 +863,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @ManagedAttribute("The number of active streams") public int getStreamCount() { - return streams.size(); + return streamsState.streamCount.intValue(); } @Override @@ -964,41 +938,12 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio * @see #onGoAway(GoAwayFrame) * @see #close(int, String, Callback) * @see #onIdleTimeout() + * TODO: review javadocs */ @Override public void onShutdown() { - if (LOG.isDebugEnabled()) - LOG.debug("Shutting down {}", this); - - switch (closed.get()) - { - case NOT_CLOSED: - { - // The other peer did not send a GO_AWAY, no need to be gentle. - if (LOG.isDebugEnabled()) - LOG.debug("Abrupt close for {}", this); - abort(new ClosedChannelException()); - break; - } - case LOCALLY_CLOSED: - { - // We have closed locally, and only shutdown - // the output; now queue a disconnect. - control(null, Callback.NOOP, new DisconnectFrame()); - break; - } - case REMOTELY_CLOSED: - { - // Nothing to do, the GO_AWAY frame we - // received will close the connection. - break; - } - default: - { - break; - } - } + streamsState.onShutdown(this); } /** @@ -1020,35 +965,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio * @see #onGoAway(GoAwayFrame) * @see #close(int, String, Callback) * @see #onShutdown() + * TODO: review javadocs */ @Override public boolean onIdleTimeout() { - switch (closed.get()) - { - case NOT_CLOSED: - { - long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTime); - if (elapsed < endPoint.getIdleTimeout()) - return false; - return notifyIdleTimeout(this); - } - case LOCALLY_CLOSED: - case REMOTELY_CLOSED: - { - abort(new TimeoutException("Idle timeout " + endPoint.getIdleTimeout() + " ms")); - return false; - } - default: - { - return false; - } - } + return streamsState.onIdleTimeout(this); } private void notIdle() { - idleTime = System.nanoTime(); + streamsState.idleTime = System.nanoTime(); } @Override @@ -1057,22 +984,46 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "upgrade"); } + protected void onStreamCreated(int streamId) + { + if (LOG.isDebugEnabled()) + LOG.debug("Created stream #{} for {}", streamId, this); + streamsState.onStreamCreated(); + } + protected void onStreamOpened(IStream stream) { + if (LOG.isDebugEnabled()) + LOG.debug("Opened stream {} for {}", stream, this); streamsOpened.incrementAndGet(); } protected void onStreamClosed(IStream stream) { + if (LOG.isDebugEnabled()) + LOG.debug("Closed stream {} for {}", stream, this); streamsClosed.incrementAndGet(); } + protected void onStreamDestroyed(int streamId) + { + if (LOG.isDebugEnabled()) + LOG.debug("Destroyed stream #{} for {}", streamId, this); + streamsState.onStreamDestroyed(); + } + @Override public void onFlushed(long bytes) throws IOException { flusher.onFlushed(bytes); } + private void terminate(Throwable cause) + { + flusher.terminate(cause); + disconnect(); + } + public void disconnect() { if (LOG.isDebugEnabled()) @@ -1080,38 +1031,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio endPoint.close(); } - private void terminate(Throwable cause) - { - while (true) - { - CloseState current = closed.get(); - switch (current) - { - case NOT_CLOSED: - case LOCALLY_CLOSED: - case REMOTELY_CLOSED: - { - if (closed.compareAndSet(current, CloseState.CLOSED)) - { - flusher.terminate(cause); - for (IStream stream : streams.values()) - { - stream.close(); - } - streams.clear(); - disconnect(); - return; - } - break; - } - default: - { - return; - } - } - } - } - public boolean isDisconnected() { return !endPoint.isOpen(); @@ -1176,6 +1095,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } + protected void notifyGoAway(Session session, GoAwayFrame frame) + { + try + { + listener.onGoAway(session, frame); + } + catch (Throwable x) + { + LOG.info("Failure while notifying listener " + listener, x); + } + } + protected void notifyClose(Session session, GoAwayFrame frame, Callback callback) { try @@ -1243,16 +1174,15 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public String toString() { - return String.format("%s@%x{l:%s <-> r:%s,sendWindow=%s,recvWindow=%s,streams=%d,%s,%s}", + return String.format("%s@%x{local:%s,remote:%s,sendWindow=%s,recvWindow=%s,%s}", getClass().getSimpleName(), hashCode(), getEndPoint().getLocalAddress(), getEndPoint().getRemoteAddress(), sendWindow, recvWindow, - streams.size(), - closed, - closeFrame); + streamsState + ); } private class ControlEntry extends HTTP2Flusher.Entry @@ -1342,29 +1272,18 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { case HEADERS: { - onStreamOpened(stream); HeadersFrame headersFrame = (HeadersFrame)frame; + if (headersFrame.getMetaData().isRequest()) + onStreamOpened(stream); if (stream.updateClose(headersFrame.isEndStream(), CloseState.Event.AFTER_SEND)) removeStream(stream); break; } - case GO_AWAY: - { - // We just sent a GO_AWAY, only shutdown the - // output without closing yet, to allow reads. - getEndPoint().shutdownOutput(); - break; - } case WINDOW_UPDATE: { flowControl.windowUpdate(HTTP2Session.this, stream, (WindowUpdateFrame)frame); break; } - case DISCONNECT: - { - terminate(new ClosedChannelException()); - break; - } default: { break; @@ -1373,14 +1292,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio super.succeeded(); } - - @Override - public void failed(Throwable x) - { - if (frame.getType() == FrameType.DISCONNECT) - terminate(new ClosedChannelException()); - super.failed(x); - } } private class DataEntry extends HTTP2Flusher.Entry @@ -1483,30 +1394,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - private static class StreamPromiseCallback implements Callback - { - private final Promise promise; - private final IStream stream; - - private StreamPromiseCallback(Promise promise, IStream stream) - { - this.promise = promise; - this.stream = stream; - } - - @Override - public void succeeded() - { - promise.succeeded(stream); - } - - @Override - public void failed(Throwable x) - { - promise.failed(x); - } - } - private class DataCallback extends Callback.Nested { private final IStream stream; @@ -1543,36 +1430,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - private class ResetCallback extends Callback.Nested - { - private final int streamId; - private final int error; - - private ResetCallback(int streamId, int error, Callback callback) - { - super(callback); - this.streamId = streamId; - this.error = error; - } - - @Override - public void succeeded() - { - complete(); - } - - @Override - public void failed(Throwable x) - { - complete(); - } - - private void complete() - { - reset(getStream(streamId), new ResetFrame(streamId, error), getCallback()); - } - } - private class OnResetCallback implements Callback { @Override @@ -1599,97 +1456,6 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } - private class CloseCallback extends Callback.Nested - { - private final int error; - private final String reason; - - private CloseCallback(int error, String reason, Callback callback) - { - super(callback); - this.error = error; - this.reason = reason; - } - - @Override - public void succeeded() - { - complete(); - } - - @Override - public void failed(Throwable x) - { - complete(); - } - - private void complete() - { - close(error, reason, getCallback()); - } - } - - private class DisconnectCallback implements Callback - { - @Override - public void succeeded() - { - complete(); - } - - @Override - public void failed(Throwable x) - { - complete(); - } - - @Override - public InvocationType getInvocationType() - { - return InvocationType.NON_BLOCKING; - } - - private void complete() - { - frames(null, Arrays.asList(newGoAwayFrame(CloseState.CLOSED, ErrorCode.NO_ERROR.code, null), new DisconnectFrame()), Callback.NOOP); - } - } - - private class TerminateCallback implements Callback - { - private final Throwable failure; - - private TerminateCallback(Throwable failure) - { - this.failure = failure; - } - - @Override - public void succeeded() - { - complete(); - } - - @Override - public void failed(Throwable x) - { - if (x != failure) - failure.addSuppressed(x); - complete(); - } - - @Override - public InvocationType getInvocationType() - { - return InvocationType.NON_BLOCKING; - } - - private void complete() - { - terminate(failure); - } - } - /** * SPEC: It is required that stream ids are monotonically increasing. * Here we use a queue to atomically create the stream id and @@ -1697,107 +1463,696 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio * flush up to the slot with a non-null entry to make sure * frames are sent strictly in their stream id order. * See https://tools.ietf.org/html/rfc7540#section-5.1.1. + * TODO: javadocs */ - private class StreamCreator + private class StreamsState { + private final Locker lock = new Locker(); private final Queue slots = new ArrayDeque<>(); + private final AtomicLong streamCount = new AtomicLong(); + private long idleTime = System.nanoTime(); + private CloseState closed = CloseState.NOT_CLOSED; + private Runnable closingAction; + private GoAwayFrame goAwayRecv; + private GoAwayFrame goAwaySent; + private Throwable failure; private Thread flushing; + private CloseState getCloseState() + { + try (Locker.Lock l = lock.lock()) + { + return closed; + } + } + + private boolean goAway(GoAwayFrame frame, Callback callback) + { + Runnable action = null; + boolean sendGoAway = false; + try (Locker.Lock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + { + goAwaySent = frame; + closed = CloseState.LOCALLY_CLOSED; + sendGoAway = true; + if (frame.isGraceful()) + { + // Try to send the non-graceful GOAWAY + // when the last stream is destroyed. + closingAction = action = () -> + { + // TODO: verify this + GoAwayFrame goAwayFrame = newGoAwayFrame(ErrorCode.NO_ERROR.code, "close"); + goAway(goAwayFrame, Callback.from(Callback.NOOP::succeeded, HTTP2Session.this::terminate)); + }; + } + break; + } + case LOCALLY_CLOSED: + { + if (frame.isGraceful()) + { + // Trying to send a non-first, but graceful, GOAWAY, ignore this one. + if (LOG.isDebugEnabled()) + LOG.debug("Already sent, ignored GOAWAY {} for {}", frame, HTTP2Session.this); + } + else + { + if (goAwaySent.isGraceful()) + { + goAwaySent = frame; + sendGoAway = true; + } + else + { + // Trying to send another non-graceful GOAWAY, ignore this one. + if (LOG.isDebugEnabled()) + LOG.debug("Already sent, ignored GOAWAY {} for {}", frame, HTTP2Session.this); + } + } + break; + } + case REMOTELY_CLOSED: + { + goAwaySent = frame; + sendGoAway = true; + if (frame.isGraceful()) + { + // Try to send the non-graceful GOAWAY + // when the last stream is destroyed. + closingAction = action = () -> + { + // TODO: verify this + GoAwayFrame goAwayFrame = newGoAwayFrame(ErrorCode.NO_ERROR.code, "close"); + goAway(goAwayFrame, Callback.from(Callback.NOOP::succeeded, HTTP2Session.this::terminate)); + }; + } + else + { + if (goAwayRecv.isGraceful()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Waiting non-graceful GOAWAY for {}", HTTP2Session.this); + } + else + { + closed = CloseState.CLOSING; + closingAction = action = () -> terminate(frame); + } + } + break; + } + default: + { + // Already closing or closed, ignore it. + if (LOG.isDebugEnabled()) + LOG.debug("Already closed, ignored {} for {}", frame, HTTP2Session.this); + break; + } + } + } + + if (sendGoAway) + { + if (action == null) + sendGoAway(frame, callback); + else + sendGoAway(frame, Callback.from(callback, this::tryRunClosingAction)); + return true; + } + else + { + callback.succeeded(); + return false; + } + } + + private void halt(String reason) + { + if (LOG.isDebugEnabled()) + LOG.debug("Halting ({}) for {}", reason, HTTP2Session.this); + GoAwayFrame frame; + boolean sendGoAway = false; + try (Locker.Lock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + case REMOTELY_CLOSED: + case LOCALLY_CLOSED: + case CLOSING: + { + if (goAwaySent == null || goAwaySent.isGraceful()) + { + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason); + sendGoAway = true; + } + frame = goAwaySent; + closed = CloseState.CLOSED; + failure = toFailure(ErrorCode.NO_ERROR.code, reason); + break; + } + default: + { + return; + } + } + } + failStreams(stream -> true, reason, true); + if (sendGoAway) + sendGoAwayAndTerminate(frame); + else + terminate(frame); + } + + private void onGoAway(GoAwayFrame frame) + { + boolean failStreams = false; + Runnable action = null; + try (Locker.Lock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + { + goAwayRecv = frame; + closed = CloseState.REMOTELY_CLOSED; + if (frame.isGraceful()) + { + if (LOG.isDebugEnabled()) + LOG.debug("Waiting non-graceful GOAWAY for {}", HTTP2Session.this); + } + else + { + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, "close"); + closed = CloseState.CLOSING; + closingAction = action = () -> sendGoAwayAndTerminate(frame); + failStreams = true; + } + break; + } + case LOCALLY_CLOSED: + { + goAwayRecv = frame; + if (frame.isGraceful()) + { + // Wait for the non-graceful GOAWAY from the other peer. + if (LOG.isDebugEnabled()) + LOG.debug("Waiting non-graceful GOAWAY for {}", HTTP2Session.this); + } + else + { + if (goAwaySent.isGraceful()) + { + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, "close"); + closed = CloseState.CLOSING; + closingAction = action = () -> sendGoAwayAndTerminate(frame); + } + else + { + closed = CloseState.CLOSING; + closingAction = action = () -> terminate(frame); + failStreams = true; + } + } + break; + } + case REMOTELY_CLOSED: + { + if (frame.isGraceful()) + { + // Received a non-first, but graceful, GOAWAY, ignore it. + if (LOG.isDebugEnabled()) + LOG.debug("Already received, ignoring GOAWAY for {}", HTTP2Session.this); + } + else + { + boolean shouldSend = goAwaySent == null || goAwaySent.isGraceful(); + goAwayRecv = frame; + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, "close"); + closed = CloseState.CLOSING; + closingAction = action = shouldSend ? () -> sendGoAwayAndTerminate(frame) : () -> terminate(frame); + failStreams = true; + } + break; + } + default: + { + // Already closing or closed, ignore it. + if (LOG.isDebugEnabled()) + LOG.debug("Already closed, ignored {} for {}", frame, HTTP2Session.this); + break; + } + } + } + + notifyGoAway(HTTP2Session.this, frame); + + if (failStreams) + { + // Must compare the lastStreamId only with local streams. + // For example, a client that sent request with streamId=137 may send a GOAWAY(4), + // where streamId=4 is the last stream pushed by the server to the client. + // The server must not compare its local streamId=4 with remote streamId=137. + Predicate failIf = stream -> stream.isLocal() && stream.getId() > frame.getLastStreamId(); + failStreams(failIf, "closing", false); + } + + if (action != null) + tryRunClosingAction(); + } + + private void onShutdown(HTTP2Session session) + { + String reason = "input_shutdown"; + boolean resetStreams = false; + try (Locker.Lock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + case LOCALLY_CLOSED: + { + if (LOG.isDebugEnabled()) + LOG.debug("Unexpected ISHUT for {}", session); + closed = CloseState.CLOSING; + GoAwayFrame frame = new GoAwayFrame(0, ErrorCode.NO_ERROR.code, reason.getBytes(StandardCharsets.UTF_8)); + closingAction = () -> terminate(frame); + failure = new ClosedChannelException(); + break; + } + case REMOTELY_CLOSED: + { + closed = CloseState.CLOSING; + GoAwayFrame frame = new GoAwayFrame(0, ErrorCode.NO_ERROR.code, reason.getBytes(StandardCharsets.UTF_8)); + closingAction = () -> terminate(frame); + failure = new ClosedChannelException(); + resetStreams = true; + break; + } + case CLOSING: + { + if (failure == null) + failure = new ClosedChannelException(); + resetStreams = true; + break; + } + default: + { + if (LOG.isDebugEnabled()) + LOG.debug("Already closed, ignoring ISHUT for {}", session); + return; + } + } + } + + if (resetStreams) + { + // Since nothing else will arrive from the other peer, reset + // the streams for which the other peer did not send all frames. + Predicate failIf = stream -> !stream.isRemotelyClosed(); + failStreams(failIf, reason, false); + tryRunClosingAction(); + } + else + { + abort(reason, failure, Callback.from(this::tryRunClosingAction)); + } + } + + private boolean onIdleTimeout(HTTP2Session session) + { + String reason = "idle_timeout"; + boolean notify = false; + Throwable cause = null; + try (Locker.Lock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + { + long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTime); + if (elapsed < endPoint.getIdleTimeout()) + return false; + notify = true; + break; + } + // Timed out while waiting for closing events. + case LOCALLY_CLOSED: + { + boolean shouldSend = goAwaySent.isGraceful(); + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason); + closed = CloseState.CLOSING; + closingAction = shouldSend ? () -> sendGoAwayAndTerminate(goAwaySent) : () -> terminate(goAwaySent); + failure = cause = new TimeoutException("Session idle timeout expired"); + break; + } + case REMOTELY_CLOSED: + { + goAwaySent = newGoAwayFrame(ErrorCode.NO_ERROR.code, reason); + closed = CloseState.CLOSING; + closingAction = () -> sendGoAwayAndTerminate(goAwaySent); + failure = cause = new TimeoutException("Session idle timeout expired"); + break; + } + default: + { + if (LOG.isDebugEnabled()) + LOG.debug("Already closed, ignored idle timeout for {}", session); + return false; + } + } + } + + if (notify) + { + boolean close = notifyIdleTimeout(session); + if (LOG.isDebugEnabled()) + LOG.debug("Idle timeout {} for {}", close ? "confirmed" : "ignored", session); + if (close) + halt(reason); + return false; + } + + abort(reason, cause, Callback.from(this::tryRunClosingAction)); + return false; + } + + private void onSessionFailure(int error, String reason, Callback callback) + { + try (Locker.Lock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + { + goAwayRecv = goAwaySent = newGoAwayFrame(error, reason); + closed = CloseState.CLOSING; + closingAction = () -> sendGoAwayAndTerminate(goAwaySent); + failure = toFailure(error, reason); + break; + } + case LOCALLY_CLOSED: + { + goAwayRecv = newGoAwayFrame(error, reason); + closed = CloseState.CLOSING; + closingAction = goAwaySent.isGraceful() ? () -> sendGoAwayAndTerminate(goAwayRecv) : () -> terminate(goAwayRecv); + failure = toFailure(error, reason); + break; + } + case REMOTELY_CLOSED: + { + goAwaySent = newGoAwayFrame(error, reason); + closed = CloseState.CLOSING; + closingAction = () -> sendGoAwayAndTerminate(goAwaySent); + failure = toFailure(error, reason); + break; + } + default: + { + if (LOG.isDebugEnabled()) + LOG.debug("Already closed, ignored session failure {}", HTTP2Session.this, failure); + callback.succeeded(); + return; + } + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("Session failure {}", HTTP2Session.this, failure); + onFailure(error, reason, failure, Callback.from(callback, this::tryRunClosingAction)); + } + + private void onWriteFailure(Throwable x) + { + String reason = "write_failure"; + try (Locker.Lock l = lock.lock()) + { + switch (closed) + { + case NOT_CLOSED: + case LOCALLY_CLOSED: + case REMOTELY_CLOSED: + { + closed = CloseState.CLOSING; + closingAction = () -> terminate(newGoAwayFrame(ErrorCode.NO_ERROR.code, reason)); + failure = x; + break; + } + default: + { + return; + } + } + } + abort(reason, x, Callback.from(this::tryRunClosingAction)); + } + + private void sendGoAwayAndTerminate(GoAwayFrame frame) + { + sendGoAway(frame, Callback.from(() -> terminate(frame))); + } + + private void sendGoAway(GoAwayFrame frame, Callback callback) + { + control(null, callback, frame); + } + + private void onStreamCreated() + { + streamCount.incrementAndGet(); + } + + private void onStreamDestroyed() + { + long count = streamCount.decrementAndGet(); + if (count == 0) + runClosingAction(); + } + + private void tryRunClosingAction() + { + long count = streamCount.get(); + if (count == 0) + { + runClosingAction(); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Deferred closing action, {} pending streams on {}", count, HTTP2Session.this); + } + } + + private void runClosingAction() + { + // Threads from onStreamClosed() and other events + // such as onGoAway() may be in a race to finish, + // but only one moves to CLOSED and runs the action. + Runnable action = null; + try (Locker.Lock l = lock.lock()) + { + switch (closed) + { + case LOCALLY_CLOSED: + { + if (goAwaySent.isGraceful()) + { + action = closingAction; + closingAction = null; + } + break; + } + case REMOTELY_CLOSED: + { + if (goAwaySent != null && goAwaySent.isGraceful()) + { + action = closingAction; + closingAction = null; + } + break; + } + case CLOSING: + { + closed = CloseState.CLOSED; + action = closingAction; + closingAction = null; + break; + } + default: + { + break; + } + } + } + if (action != null) + { + if (LOG.isDebugEnabled()) + LOG.debug("Executing closing action on {}", HTTP2Session.this); + action.run(); + } + } + + private void terminate(GoAwayFrame frame) + { + if (LOG.isDebugEnabled()) + LOG.debug("Terminating {}", HTTP2Session.this); + HTTP2Session.this.terminate(failure); + notifyClose(HTTP2Session.this, frame, Callback.NOOP); + } + private int priority(PriorityFrame frame, Callback callback) { Slot slot = new Slot(); int currentStreamId = frame.getStreamId(); - int streamId = reserveSlot(slot, currentStreamId); - - if (currentStreamId <= 0) - frame = frame.withStreamId(streamId); - - slot.entries = Collections.singletonList(newEntry(frame, null, callback)); - flush(); + int streamId = reserveSlot(slot, currentStreamId, callback::failed); + if (streamId > 0) + { + if (currentStreamId <= 0) + frame = frame.withStreamId(streamId); + slot.entries = Collections.singletonList(newEntry(frame, null, Callback.from(callback::succeeded, x -> + { + HTTP2Session.this.onStreamDestroyed(streamId); + callback.failed(x); + }))); + flush(); + } return streamId; } - private void newStream(IStream.FrameList frameList, Promise promise, Stream.Listener listener) + private void newLocalStream(IStream.FrameList frameList, Promise promise, Stream.Listener listener) { Slot slot = new Slot(); int currentStreamId = frameList.getStreamId(); - int streamId = reserveSlot(slot, currentStreamId); - - List frames = frameList.getFrames(); - if (currentStreamId <= 0) + int streamId = reserveSlot(slot, currentStreamId, promise::failed); + if (streamId > 0) { - frames = frames.stream() - .map(frame -> frame.withStreamId(streamId)) - .collect(Collectors.toList()); + List frames = frameList.getFrames(); + if (currentStreamId <= 0) + { + frames = frames.stream() + .map(frame -> frame.withStreamId(streamId)) + .collect(Collectors.toList()); + } + if (createLocalStream(slot, frames, promise, listener, streamId)) + return; + freeSlot(slot, streamId); } + } - try + private boolean newRemoteStream(int streamId) + { + try (Locker.Lock l = lock.lock()) { - createLocalStream(slot, frames, promise, listener, streamId); - } - catch (Throwable x) - { - freeSlotAndFailPromise(slot, promise, x); + switch (closed) + { + case NOT_CLOSED: + { + HTTP2Session.this.onStreamCreated(streamId); + return true; + } + case LOCALLY_CLOSED: + { + // SPEC: streams larger than GOAWAY's lastStreamId are dropped. + if (streamId <= goAwaySent.getLastStreamId()) + { + // Allow creation of streams that may have been in-flight. + HTTP2Session.this.onStreamCreated(streamId); + return true; + } + return false; + } + default: + return false; + } } } private void push(PushPromiseFrame frame, Promise promise, Stream.Listener listener) { Slot slot = new Slot(); - int streamId = reserveSlot(slot, 0); - frame = frame.withStreamId(streamId); - - try + int streamId = reserveSlot(slot, 0, promise::failed); + if (streamId > 0) { - createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId); - } - catch (Throwable x) - { - freeSlotAndFailPromise(slot, promise, x); + frame = frame.withStreamId(streamId); + if (createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId)) + return; + freeSlot(slot, streamId); } } - private int reserveSlot(Slot slot, int streamId) + private boolean createLocalStream(Slot slot, List frames, Promise promise, Stream.Listener listener, int streamId) { - if (streamId <= 0) + IStream stream = HTTP2Session.this.createLocalStream(streamId, promise); + if (stream == null) + return false; + + stream.setListener(listener); + Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x -> { - synchronized (this) - { - streamId = localStreamIds.getAndAdd(2); - slots.offer(slot); - } + HTTP2Session.this.onStreamDestroyed(streamId); + promise.failed(x); + }); + int count = frames.size(); + if (count == 1) + { + slot.entries = Collections.singletonList(newEntry(frames.get(0), stream, streamCallback)); } else { - synchronized (this) + Callback callback = new CountingCallback(streamCallback, count); + slot.entries = frames.stream() + .map(frame -> newEntry(frame, stream, callback)) + .collect(Collectors.toList()); + } + flush(); + return true; + } + + private int reserveSlot(Slot slot, int streamId, Consumer fail) + { + int newStreamId = streamId; + Throwable failure = null; + try (Locker.Lock l = lock.lock()) + { + if (closed == CloseState.NOT_CLOSED) { + if (newStreamId <= 0) + newStreamId = localStreamIds.getAndAdd(2); slots.offer(slot); } + else + { + failure = this.failure; + if (failure == null) + failure = new IllegalStateException("session closed"); + } + } + if (failure == null) + { + if (streamId <= 0) + HTTP2Session.this.onStreamCreated(newStreamId); + return newStreamId; + } + else + { + fail.accept(failure); + return 0; } - return streamId; } - private void createLocalStream(Slot slot, List frames, Promise promise, Stream.Listener listener, int streamId) + private void freeSlot(Slot slot, int streamId) { - IStream stream = HTTP2Session.this.createLocalStream(streamId); - stream.setListener(listener); - int count = frames.size(); - Callback streamCallback = new StreamPromiseCallback(promise, stream); - Callback callback = count == 1 ? streamCallback : new CountingCallback(streamCallback, count); - slot.entries = frames.stream() - .map(frame -> newEntry(frame, stream, callback)) - .collect(Collectors.toList()); - flush(); - } - - private void freeSlotAndFailPromise(Slot slot, Promise promise, Throwable x) - { - synchronized (this) + try (Locker.Lock l = lock.lock()) { slots.remove(slot); } + HTTP2Session.this.onStreamDestroyed(streamId); flush(); - promise.failed(x); } /** @@ -1821,7 +2176,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio while (true) { List entries; - synchronized (this) + try (Locker.Lock l = lock.lock()) { if (flushing == null) flushing = thread; @@ -1845,6 +2200,21 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio flusher.iterate(); } + @Override + public String toString() + { + try (Locker.Lock l = lock.lock()) + { + return String.format("state=[streams=%d,%s,goAwayRecv=%s,goAwaySent=%s,failure=%s]", + streamCount.get(), + closed, + goAwayRecv, + goAwaySent, + failure + ); + } + } + private class Slot { private volatile List entries; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index d0b108230ff..6531d05afea 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -326,21 +326,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString()); dataLength = length >= 0 ? length : Long.MIN_VALUE; } - callback.succeeded(); } private void onData(DataFrame frame, Callback callback) { - if (getRecvWindow() < 0) - { - // It's a bad client, it does not deserve to be - // treated gently by just resetting the stream. - session.close(ErrorCode.FLOW_CONTROL_ERROR.code, "stream_window_exceeded", Callback.NOOP); - callback.failed(new IOException("stream_window_exceeded")); - return; - } - // SPEC: remotely closed streams must be replied with a reset. if (isRemotelyClosed()) { @@ -381,8 +371,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa failure = new EofException("reset"); } close(); - session.removeStream(this); - notifyReset(this, frame, callback); + if (session.removeStream(this)) + notifyReset(this, frame, callback); } private void onPush(PushPromiseFrame frame, Callback callback) @@ -405,8 +395,8 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa failure = frame.getFailure(); } close(); - session.removeStream(this); - notifyFailure(this, frame, callback); + if (session.removeStream(this)) + notifyFailure(this, frame, callback); } @Override diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java index 412e6808722..430d0269b4b 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/ISession.java @@ -44,8 +44,9 @@ public interface ISession extends Session *

Removes the given {@code stream}.

* * @param stream the stream to remove + * @return whether the stream was removed */ - void removeStream(IStream stream); + boolean removeStream(IStream stream); /** *

Sends the given list of frames to create a new {@link Stream}.

diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java index 3406d9f0453..8033dbfefbb 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/api/Session.java @@ -97,8 +97,6 @@ public interface Session /** *

Closes the session by sending a GOAWAY frame with the given error code * and payload.

- *

The GOAWAY frame is sent only once; subsequent or concurrent attempts to - * close the session will have no effect.

* * @param error the error code * @param payload an optional payload (may be null) @@ -197,6 +195,16 @@ public interface Session * * @param session the session * @param frame the GOAWAY frame received + */ + default void onGoAway(Session session, GoAwayFrame frame) + { + } + + /** + *

Callback method invoked when a GOAWAY frame caused the session to be closed.

+ * + * @param session the session + * @param frame the GOAWAY frame that caused the session to be closed * @param callback the callback to notify of the GOAWAY processing */ default void onClose(Session session, GoAwayFrame frame, Callback callback) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/GoAwayFrame.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/GoAwayFrame.java index bbd1bcdb703..743eb07b056 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/GoAwayFrame.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/frames/GoAwayFrame.java @@ -20,30 +20,33 @@ package org.eclipse.jetty.http2.frames; import java.nio.charset.StandardCharsets; -import org.eclipse.jetty.http2.CloseState; import org.eclipse.jetty.http2.ErrorCode; public class GoAwayFrame extends Frame { - private final CloseState closeState; + public static final GoAwayFrame GRACEFUL = new GoAwayFrame(Integer.MAX_VALUE, ErrorCode.NO_ERROR.code, new byte[]{'g', 'r', 'a', 'c', 'e', 'f', 'u', 'l'}); + private final int lastStreamId; private final int error; private final byte[] payload; public GoAwayFrame(int lastStreamId, int error, byte[] payload) - { - this(CloseState.REMOTELY_CLOSED, lastStreamId, error, payload); - } - - public GoAwayFrame(CloseState closeState, int lastStreamId, int error, byte[] payload) { super(FrameType.GO_AWAY); - this.closeState = closeState; this.lastStreamId = lastStreamId; this.error = error; this.payload = payload; } + /** + * @return whether this GOAWAY frame is graceful, i.e. its {@code lastStreamId == Integer.MAX_VALUE} + */ + public boolean isGraceful() + { + // SPEC: section 6.8. + return lastStreamId == Integer.MAX_VALUE; + } + public int getLastStreamId() { return lastStreamId; @@ -76,11 +79,10 @@ public class GoAwayFrame extends Frame @Override public String toString() { - return String.format("%s,%d/%s/%s/%s", + return String.format("%s{%d/%s/%s}", super.toString(), lastStreamId, ErrorCode.toString(error, null), - tryConvertPayload(), - closeState); + tryConvertPayload()); } } diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java index b31b6632d76..1f2fd05daa6 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/AbstractTest.java @@ -35,6 +35,7 @@ public class AbstractTest { protected Server server; protected ServerConnector connector; + protected HTTP2Client http2Client; protected HttpClient client; protected void start(ServerSessionListener listener) throws Exception @@ -63,12 +64,13 @@ public class AbstractTest server.addConnector(connector); } - protected void prepareClient() throws Exception + protected void prepareClient() { - client = new HttpClient(new HttpClientTransportOverHTTP2(new HTTP2Client()), null); + http2Client = new HTTP2Client(); + client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client), null); QueuedThreadPool clientExecutor = new QueuedThreadPool(); clientExecutor.setName("client"); - client.setExecutor(clientExecutor); + this.client.setExecutor(clientExecutor); } @AfterEach diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java index 664e5a0fed9..bb89c05a8b3 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2Test.java @@ -220,7 +220,9 @@ public class HttpClientTransportOverHTTP2Test extends AbstractTest MetaData.Request request = (MetaData.Request)frame.getMetaData(); if (HttpMethod.HEAD.is(request.getMethod())) { - stream.getSession().close(ErrorCode.REFUSED_STREAM_ERROR.code, null, Callback.NOOP); + int error = ErrorCode.REFUSED_STREAM_ERROR.code; + stream.reset(new ResetFrame(stream.getId(), error), Callback.NOOP); + stream.getSession().close(error, null, Callback.NOOP); } else { diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java index 7f8a4d9a659..a1c44493584 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/RawHTTP2ServerConnectionFactory.java @@ -104,6 +104,12 @@ public class RawHTTP2ServerConnectionFactory extends AbstractHTTP2ServerConnecti delegate.onReset(session, frame); } + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + delegate.onGoAway(session, frame); + } + @Override public void onClose(Session session, GoAwayFrame frame) { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java index d8ab170f61a..576853ee85f 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/IteratingCallback.java @@ -503,6 +503,6 @@ public abstract class IteratingCallback implements Callback @Override public String toString() { - return String.format("%s[%s]", super.toString(), _state); + return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), _state); } }