From 9a84bbbb71e31d3d2c2ba2819053678e27bacd48 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 13 Oct 2021 11:53:34 +0200 Subject: [PATCH] Issue #6728 - QUIC and HTTP/3 - Implemented graceful shutdown functionality. Signed-off-by: Simone Bordet --- .../jetty/http3/client/HTTP3Client.java | 5 + .../client/internal/ClientHTTP3Session.java | 10 +- .../jetty/http3/internal/HTTP3Session.java | 66 ++-- .../server/internal/ServerHTTP3Session.java | 10 +- .../eclipse/jetty/http3/tests/GoAwayTest.java | 304 +++++++++++++++++- .../jetty/quic/common/ProtocolSession.java | 9 +- .../jetty/quic/common/QuicSession.java | 57 ++-- .../quic/common/QuicSessionContainer.java | 35 +- .../quic/server/QuicServerConnector.java | 19 +- 9 files changed, 417 insertions(+), 98 deletions(-) diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3Client.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3Client.java index 55a269295a4..5d586f3dc05 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3Client.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3Client.java @@ -162,4 +162,9 @@ public class HTTP3Client extends ContainerLifeCycle } return connection; } + + public CompletableFuture shutdown() + { + return container.shutdown(); + } } diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java index e71ac597b44..abef376673c 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java @@ -14,6 +14,7 @@ package org.eclipse.jetty.http3.client.internal; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.frames.Frame; @@ -167,8 +168,13 @@ public class ClientHTTP3Session extends ClientProtocolSession { if (LOG.isDebugEnabled()) LOG.debug("inward closing 0x{}/{} on {}", Long.toHexString(error), reason, this); - // TODO: maybe we should be harsher here... see onIdleTimeout() - session.goAway(false); + session.disconnect(reason); + } + + @Override + public CompletableFuture shutdown() + { + return session.shutdown(); } @Override diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java index 440053d18a2..f4cf56d2d14 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java @@ -45,7 +45,6 @@ import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.thread.AutoLock; -import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +65,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session private GoAwayFrame goAwaySent; private GoAwayFrame goAwayRecv; private Runnable zeroStreamsAction; - private Callback.Completable shutdown; + private CompletableFuture shutdown; public HTTP3Session(ProtocolSession session, Session.Listener listener) { @@ -121,7 +120,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session boolean failStreams = false; boolean sendGoAway = false; - Callback.Completable callback = null; try (AutoLock l = lock.lock()) { switch (closeState) @@ -135,7 +133,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session { // Send the non-graceful GOAWAY when the last stream is destroyed. zeroStreamsAction = () -> goAway(false); - shutdown = callback = new Callback.Completable(); } break; } @@ -170,7 +167,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session { // Send the non-graceful GOAWAY when the last stream is destroyed. zeroStreamsAction = () -> goAway(false); - shutdown = callback = new Callback.Completable(); } else { @@ -203,18 +199,10 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session if (sendGoAway) { - if (callback == null) - { - callback = new Callback.Completable(); - callback.thenRun(this::tryRunZeroStreamsAction); - writeControlFrame(frame, callback); - } - else - { - Callback writeCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, this::tryRunZeroStreamsAction, callback::failed); - writeControlFrame(frame, writeCallback); - } - return callback; + Callback.Completable result = new Callback.Completable(); + result.thenRun(this::tryRunZeroStreamsAction); + writeControlFrame(frame, result); + return result; } else { @@ -229,6 +217,19 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session return new GoAwayFrame(lastId.get()); } + public CompletableFuture shutdown() + { + CompletableFuture result; + try (AutoLock l = lock.lock()) + { + if (shutdown != null) + return shutdown; + shutdown = result = new Callback.Completable(); + } + goAway(true); + return result; + } + protected void updateLastId(long id) { Atomics.updateMax(lastId, id); @@ -629,6 +630,13 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session if (!confirmed) return false; + disconnect("idle_timeout"); + + return false; + } + + public void disconnect(String reason) + { GoAwayFrame goAwayFrame = null; try (AutoLock l = lock.lock()) { @@ -646,7 +654,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session } case CLOSED: { - return false; + return; } default: { @@ -655,14 +663,12 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session } } - failStreams(stream -> true, "session_idle_timeout", true); + failStreams(stream -> true, reason, true); if (goAwayFrame != null) - writeControlFrame(goAwayFrame, Callback.from(() -> terminate("idle_timeout"))); + writeControlFrame(goAwayFrame, Callback.from(() -> terminate(reason))); else - terminate("idle_timeout"); - - return false; + terminate(reason); } private void failStreams(Predicate predicate, String reason, boolean close) @@ -690,12 +696,19 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session // Since the close() above is called by the // implementation, notify the application. notifyDisconnect(); + // Notify the shutdown completable. + CompletableFuture shutdown; + try (AutoLock l = lock.lock()) + { + shutdown = this.shutdown; + } + if (shutdown != null) + shutdown.complete(null); } private void tryRunZeroStreamsAction() { Runnable action = null; - CompletableFuture completable; try (AutoLock l = lock.lock()) { long count = streamCount.get(); @@ -706,8 +719,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session return; } - completable = shutdown; - switch (closeState) { case LOCALLY_CLOSED: @@ -753,9 +764,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session LOG.debug("executing zero streams action on {}", this); action.run(); } - - if (completable != null) - completable.complete(null); } public void onClose(long error, String reason) diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index 50bafcb76b5..0b7791b4e93 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.server.internal; import java.util.ArrayDeque; import java.util.Map; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.frames.Frame; @@ -193,8 +194,13 @@ public class ServerHTTP3Session extends ServerProtocolSession { if (LOG.isDebugEnabled()) LOG.debug("inward closing 0x{}/{} on {}", Long.toHexString(error), reason, this); - // TODO: maybe we should be harsher here... like halt() see onIdleTimeout() - session.goAway(false); + session.disconnect(reason); + } + + @Override + public CompletableFuture shutdown() + { + return session.shutdown(); } @Override diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java index be689457c9f..a9ac309f96b 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/GoAwayTest.java @@ -13,10 +13,12 @@ package org.eclipse.jetty.http3.tests; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.http.HttpFields; @@ -31,12 +33,12 @@ import org.eclipse.jetty.http3.frames.GoAwayFrame; import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.internal.HTTP3Session; +import org.eclipse.jetty.http3.internal.HTTP3Stream; import org.eclipse.jetty.http3.server.internal.HTTP3SessionServer; import org.eclipse.jetty.quic.client.ClientQuicSession; import org.eclipse.jetty.quic.common.QuicConnection; import org.eclipse.jetty.quic.server.ServerQuicSession; import org.eclipse.jetty.util.BufferUtil; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import static org.awaitility.Awaitility.await; @@ -656,7 +658,7 @@ public class GoAwayTest extends AbstractClientServerTest } @Test - public void testClientShutdownServerCloses() throws Exception + public void testClientDisconnectServerCloses() throws Exception { AtomicReference serverSessionRef = new AtomicReference<>(); CountDownLatch settingsLatch = new CountDownLatch(2); @@ -695,7 +697,7 @@ public class GoAwayTest extends AbstractClientServerTest assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); - // Issue a network close. + // Issue a network disconnection. clientSession.getProtocolSession().getQuicSession().getQuicConnection().close(); assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS)); @@ -706,7 +708,7 @@ public class GoAwayTest extends AbstractClientServerTest } @Test - public void testServerGracefulGoAwayClientShutdownServerCloses() throws Exception + public void testServerGracefulGoAwayClientDisconnectServerCloses() throws Exception { AtomicReference serverSessionRef = new AtomicReference<>(); CountDownLatch settingsLatch = new CountDownLatch(2); @@ -739,7 +741,7 @@ public class GoAwayTest extends AbstractClientServerTest @Override public void onGoAway(Session session, GoAwayFrame frame) { - // Reply to the graceful GOAWAY from the server with a network close. + // Reply to the graceful GOAWAY from the server with a network disconnection. ((HTTP3Session)session).getProtocolSession().getQuicSession().getQuicConnection().close(); } @@ -762,6 +764,67 @@ public class GoAwayTest extends AbstractClientServerTest assertTrue(((HTTP3Session)clientSession).isClosed()); } + @Test + public void testClientIdleTimeout() throws Exception + { + long idleTimeout = 1000; + + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverDisconnectLatch = new CountDownLatch(1); + start(new Session.Server.Listener() + { + @Override + public void onAccept(Session session) + { + serverSessionRef.set((HTTP3Session)session); + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onDisconnect(Session session) + { + serverDisconnectLatch.countDown(); + } + }); + client.getClientConnector().setIdleTimeout(Duration.ofMillis(idleTimeout)); + + CountDownLatch clientIdleTimeoutLatch = new CountDownLatch(1); + CountDownLatch clientDisconnectLatch = new CountDownLatch(1); + HTTP3Session clientSession = (HTTP3Session)newSession(new Session.Client.Listener() + { + @Override + public boolean onIdleTimeout(Session session) + { + clientIdleTimeoutLatch.countDown(); + return true; + } + + @Override + public void onDisconnect(Session session) + { + clientDisconnectLatch.countDown(); + } + }); + + assertTrue(clientIdleTimeoutLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + // Client should send a GOAWAY to the server, which should reply. + assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS)); + + HTTP3Session serverSession = serverSessionRef.get(); + assertTrue(serverSession.isClosed()); + assertTrue(clientSession.isClosed()); + + await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getProtocolSession().getQuicSession().getQuicConnection().getEndPoint().isOpen(), is(false)); + } + @Test public void testServerIdleTimeout() throws Exception { @@ -769,7 +832,6 @@ public class GoAwayTest extends AbstractClientServerTest AtomicReference serverSessionRef = new AtomicReference<>(); CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1); - CountDownLatch serverGoAwayLatch = new CountDownLatch(1); CountDownLatch serverDisconnectLatch = new CountDownLatch(1); start(new Session.Server.Listener() { @@ -786,12 +848,6 @@ public class GoAwayTest extends AbstractClientServerTest return true; } - @Override - public void onGoAway(Session session, GoAwayFrame frame) - { - serverGoAwayLatch.countDown(); - } - @Override public void onDisconnect(Session session) { @@ -1004,7 +1060,7 @@ public class GoAwayTest extends AbstractClientServerTest } @Test - public void testServerGoAwayWithStreamsThenShutdown() throws Exception + public void testServerGoAwayWithStreamsThenDisconnect() throws Exception { AtomicReference serverSessionRef = new AtomicReference<>(); CountDownLatch serverGoAwayLatch = new CountDownLatch(1); @@ -1063,7 +1119,7 @@ public class GoAwayTest extends AbstractClientServerTest assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); // Neither the client nor the server are finishing - // the pending stream, so force the close on the server. + // the pending stream, so force the disconnect on the server. HTTP3Session serverSession = (HTTP3Session)serverSessionRef.get(); serverSession.getProtocolSession().getQuicSession().getQuicConnection().close(); @@ -1075,4 +1131,224 @@ public class GoAwayTest extends AbstractClientServerTest assertTrue(serverSession.isClosed()); assertTrue(clientSession.isClosed()); } + + @Test + public void testClientStop() throws Exception + { + CountDownLatch settingsLatch = new CountDownLatch(2); + CountDownLatch serverGoAwayLatch = new CountDownLatch(1); + CountDownLatch serverDisconnectLatch = new CountDownLatch(1); + start(new Session.Server.Listener() + { + @Override + public void onSettings(Session session, SettingsFrame frame) + { + settingsLatch.countDown(); + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + serverGoAwayLatch.countDown(); + } + + @Override + public void onDisconnect(Session session) + { + serverDisconnectLatch.countDown(); + } + }); + + CountDownLatch clientDisconnectLatch = new CountDownLatch(1); + HTTP3Session clientSession = (HTTP3Session)newSession(new Session.Client.Listener() + { + @Override + public void onSettings(Session session, SettingsFrame frame) + { + settingsLatch.countDown(); + } + + @Override + public void onDisconnect(Session session) + { + clientDisconnectLatch.countDown(); + } + }); + + assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); + + client.stop(); + + assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS)); + + await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getProtocolSession().getQuicSession().getQuicConnection().getEndPoint().isOpen(), is(false)); + } + + @Test + public void testServerStop() throws Exception + { + AtomicReference serverSessionRef = new AtomicReference<>(); + CountDownLatch settingsLatch = new CountDownLatch(2); + CountDownLatch serverDisconnectLatch = new CountDownLatch(1); + start(new Session.Server.Listener() + { + @Override + public void onSettings(Session session, SettingsFrame frame) + { + serverSessionRef.set((HTTP3Session)session); + settingsLatch.countDown(); + } + + @Override + public void onDisconnect(Session session) + { + serverDisconnectLatch.countDown(); + } + }); + + CountDownLatch clientGoAwayLatch = new CountDownLatch(1); + CountDownLatch clientDisconnectLatch = new CountDownLatch(1); + HTTP3Session clientSession = (HTTP3Session)newSession(new Session.Client.Listener() + { + @Override + public void onSettings(Session session, SettingsFrame frame) + { + settingsLatch.countDown(); + } + + @Override + public void onGoAway(Session session, GoAwayFrame frame) + { + clientGoAwayLatch.countDown(); + } + + @Override + public void onDisconnect(Session session) + { + clientDisconnectLatch.countDown(); + } + }); + + assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); + + server.stop(); + + assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS)); + + await().atMost(1, TimeUnit.SECONDS).until(() -> serverSessionRef.get().getProtocolSession().getQuicSession().getQuicConnection().getEndPoint().isOpen(), is(false)); + await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getProtocolSession().getQuicSession().getQuicConnection().getEndPoint().isOpen(), is(false)); + } + + @Test + public void testClientShutdown() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + start(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + serverStreamRef.set((HTTP3Stream)stream); + stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false)); + return null; + } + }); + + CountDownLatch responseLatch = new CountDownLatch(1); + CountDownLatch dataLatch = new CountDownLatch(1); + HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener() {}); + clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener() + { + @Override + public void onResponse(Stream stream, HeadersFrame frame) + { + responseLatch.countDown(); + stream.demand(); + } + + @Override + public void onDataAvailable(Stream stream) + { + Stream.Data data = stream.readData(); + if (data != null) + { + data.complete(); + if (data.isLast()) + dataLatch.countDown(); + } + stream.demand(); + } + }); + + assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + + CompletableFuture shutdown = client.shutdown(); + + // Shutdown must not complete yet. + assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS)); + + // Complete the response. + serverStreamRef.get().data(new DataFrame(BufferUtil.EMPTY_BUFFER, true)); + + assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + shutdown.get(5, TimeUnit.SECONDS); + } + + @Test + public void testServerShutdown() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + start(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + serverStreamRef.set((HTTP3Stream)stream); + stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false)); + return null; + } + }); + + CountDownLatch responseLatch = new CountDownLatch(1); + CountDownLatch dataLatch = new CountDownLatch(1); + HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener() {}); + clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener() + { + @Override + public void onResponse(Stream stream, HeadersFrame frame) + { + responseLatch.countDown(); + stream.demand(); + } + + @Override + public void onDataAvailable(Stream stream) + { + Stream.Data data = stream.readData(); + if (data != null) + { + data.complete(); + if (data.isLast()) + dataLatch.countDown(); + } + stream.demand(); + } + }); + + assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); + + CompletableFuture shutdown = connector.shutdown(); + // Shutdown must not complete yet. + assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS)); + + // Complete the response. + serverStreamRef.get().data(new DataFrame(BufferUtil.EMPTY_BUFFER, true)); + + assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + shutdown.get(5, TimeUnit.SECONDS); + } } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java index aa7e3933100..c75451e003e 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java @@ -15,6 +15,7 @@ package org.eclipse.jetty.quic.common; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -130,7 +131,7 @@ public abstract class ProtocolSession extends ContainerLifeCycle public void inwardClose(long error, String reason) { - getQuicSession().outwardClose(error, reason); + outwardClose(error, reason); } public void outwardClose(long error, String reason) @@ -138,6 +139,12 @@ public abstract class ProtocolSession extends ContainerLifeCycle getQuicSession().outwardClose(error, reason); } + public CompletableFuture shutdown() + { + outwardClose(0x0, "shutdown"); + return CompletableFuture.completedFuture(null); + } + protected abstract void onClose(long error, String reason); @Override diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java index 129989df0d1..9dcc3e7a1b4 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java @@ -16,12 +16,11 @@ package org.eclipse.jetty.quic.common; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collection; import java.util.EventListener; import java.util.List; -import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -40,10 +39,7 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.DumpableCollection; -import org.eclipse.jetty.util.thread.AutoLock; -import org.eclipse.jetty.util.thread.ExecutionStrategy; import org.eclipse.jetty.util.thread.Scheduler; -import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,8 +58,6 @@ public abstract class QuicSession extends ContainerLifeCycle private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class); private final AtomicLong[] ids = new AtomicLong[StreamType.values().length]; - private final AutoLock strategyQueueLock = new AutoLock(); - private final Queue strategyQueue = new ArrayDeque<>(); private final ConcurrentMap endPoints = new ConcurrentHashMap<>(); private final Executor executor; private final Scheduler scheduler; @@ -71,9 +65,8 @@ public abstract class QuicSession extends ContainerLifeCycle private final QuicheConnection quicheConnection; private final QuicConnection connection; private final Flusher flusher; - private final ExecutionStrategy strategy; private SocketAddress remoteAddress; - private ProtocolSession protocolSession; + private volatile ProtocolSession protocolSession; private QuicheConnectionId quicheConnectionId; private long idleTimeout; @@ -86,8 +79,6 @@ public abstract class QuicSession extends ContainerLifeCycle this.connection = connection; this.flusher = new Flusher(scheduler); addBean(flusher); - this.strategy = new AdaptiveExecutionStrategy(new Producer(), executor); - addBean(strategy); this.remoteAddress = remoteAddress; Arrays.setAll(ids, i -> new AtomicLong()); } @@ -136,6 +127,14 @@ public abstract class QuicSession extends ContainerLifeCycle } } + public CompletableFuture shutdown() + { + ProtocolSession session = this.protocolSession; + if (session != null) + return session.shutdown(); + return CompletableFuture.completedFuture(null); + } + public Executor getExecutor() { return executor; @@ -323,12 +322,13 @@ public abstract class QuicSession extends ContainerLifeCycle // H3ProtoSession - QpackDecoder // H3ProtoSession -* request streams - if (protocolSession == null) + ProtocolSession session = protocolSession; + if (session == null) { - protocolSession = createProtocolSession(); - addManaged(protocolSession); + protocolSession = session = createProtocolSession(); + addManaged(session); } - protocolSession.process(); + session.process(); } else { @@ -355,15 +355,6 @@ public abstract class QuicSession extends ContainerLifeCycle public abstract Connection newConnection(QuicStreamEndPoint endPoint); - private void dispatch(Runnable runnable) - { - try (AutoLock l = strategyQueueLock.lock()) - { - strategyQueue.offer(runnable); - } - strategy.dispatch(); - } - public void flush() { if (LOG.isDebugEnabled()) @@ -397,6 +388,7 @@ public abstract class QuicSession extends ContainerLifeCycle public void inwardClose(long error, String reason) { protocolSession.inwardClose(error, reason); + flush(); } public void outwardClose(long error, String reason) @@ -404,8 +396,7 @@ public abstract class QuicSession extends ContainerLifeCycle if (LOG.isDebugEnabled()) LOG.debug("outward closing 0x{}/{} on {}", Long.toHexString(error), reason, this); quicheConnection.close(error, reason); - // Flushing will eventually forward - // the outward close to the connection. + // Flushing will eventually forward the outward close to the connection. flush(); } @@ -454,7 +445,7 @@ public abstract class QuicSession extends ContainerLifeCycle if (LOG.isDebugEnabled()) LOG.debug("re-iterating after quiche timeout {}", QuicSession.this); // Do not use the timer thread to iterate. - dispatch(() -> iterate()); + getExecutor().execute(() -> iterate()); } }; } @@ -530,18 +521,6 @@ public abstract class QuicSession extends ContainerLifeCycle } } - private class Producer implements ExecutionStrategy.Producer - { - @Override - public Runnable produce() - { - try (AutoLock l = strategyQueueLock.lock()) - { - return strategyQueue.poll(); - } - } - } - public interface Listener extends EventListener { public default void onOpened(QuicSession session) diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSessionContainer.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSessionContainer.java index d1110929bc1..3ea04915cc2 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSessionContainer.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSessionContainer.java @@ -15,15 +15,19 @@ package org.eclipse.jetty.quic.common; import java.io.IOException; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.DumpableCollection; +import org.eclipse.jetty.util.component.Graceful; -public class QuicSessionContainer extends AbstractLifeCycle implements QuicSession.Listener, Dumpable +public class QuicSessionContainer extends AbstractLifeCycle implements QuicSession.Listener, Graceful, Dumpable { private final Set sessions = ConcurrentHashMap.newKeySet(); + private final AtomicReference> shutdown = new AtomicReference<>(); @Override public void onOpened(QuicSession session) @@ -37,6 +41,35 @@ public class QuicSessionContainer extends AbstractLifeCycle implements QuicSessi sessions.remove(session); } + @Override + public CompletableFuture shutdown() + { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture existing = shutdown.compareAndExchange(null, result); + if (existing == null) + { + CompletableFuture.allOf(sessions.stream().map(QuicSession::shutdown).toArray(CompletableFuture[]::new)) + .whenComplete((v, x) -> + { + if (x == null) + result.complete(v); + else + result.completeExceptionally(x); + }); + return result; + } + else + { + return existing; + } + } + + @Override + public boolean isShutdown() + { + return shutdown.get() != null; + } + @Override public String dump() { diff --git a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/QuicServerConnector.java b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/QuicServerConnector.java index f979543d209..944cbebc53f 100644 --- a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/QuicServerConnector.java +++ b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/QuicServerConnector.java @@ -21,6 +21,7 @@ import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.util.EventListener; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import org.eclipse.jetty.io.ByteBufferPool; @@ -199,23 +200,21 @@ public class QuicServerConnector extends AbstractNetworkConnector @Override protected void doStop() throws Exception { + // We want the DatagramChannel to be stopped by the SelectorManager. super.doStop(); + + removeBean(datagramChannel); + datagramChannel = null; + localPort = -2; + for (EventListener l : getBeans(EventListener.class)) selectorManager.removeEventListener(l); } @Override - public void close() + public CompletableFuture shutdown() { - super.close(); - DatagramChannel datagramChannel = this.datagramChannel; - this.datagramChannel = null; - if (datagramChannel != null) - { - removeBean(datagramChannel); - IO.close(datagramChannel); - } - localPort = -2; + return container.shutdown(); } @Override