Issue #6728 - QUIC and HTTP/3

- Implemented graceful shutdown functionality.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-13 11:53:34 +02:00
parent 392d59e9f7
commit 9a84bbbb71
9 changed files with 417 additions and 98 deletions

View File

@ -162,4 +162,9 @@ public class HTTP3Client extends ContainerLifeCycle
} }
return connection; return connection;
} }
public CompletableFuture<Void> shutdown()
{
return container.shutdown();
}
} }

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.http3.client.internal; package org.eclipse.jetty.http3.client.internal;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
@ -167,8 +168,13 @@ public class ClientHTTP3Session extends ClientProtocolSession
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("inward closing 0x{}/{} on {}", Long.toHexString(error), reason, this); LOG.debug("inward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
// TODO: maybe we should be harsher here... see onIdleTimeout() session.disconnect(reason);
session.goAway(false); }
@Override
public CompletableFuture<Void> shutdown()
{
return session.shutdown();
} }
@Override @Override

View File

@ -45,7 +45,6 @@ import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -66,7 +65,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
private GoAwayFrame goAwaySent; private GoAwayFrame goAwaySent;
private GoAwayFrame goAwayRecv; private GoAwayFrame goAwayRecv;
private Runnable zeroStreamsAction; private Runnable zeroStreamsAction;
private Callback.Completable shutdown; private CompletableFuture<Void> shutdown;
public HTTP3Session(ProtocolSession session, Session.Listener listener) public HTTP3Session(ProtocolSession session, Session.Listener listener)
{ {
@ -121,7 +120,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
boolean failStreams = false; boolean failStreams = false;
boolean sendGoAway = false; boolean sendGoAway = false;
Callback.Completable callback = null;
try (AutoLock l = lock.lock()) try (AutoLock l = lock.lock())
{ {
switch (closeState) switch (closeState)
@ -135,7 +133,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
{ {
// Send the non-graceful GOAWAY when the last stream is destroyed. // Send the non-graceful GOAWAY when the last stream is destroyed.
zeroStreamsAction = () -> goAway(false); zeroStreamsAction = () -> goAway(false);
shutdown = callback = new Callback.Completable();
} }
break; break;
} }
@ -170,7 +167,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
{ {
// Send the non-graceful GOAWAY when the last stream is destroyed. // Send the non-graceful GOAWAY when the last stream is destroyed.
zeroStreamsAction = () -> goAway(false); zeroStreamsAction = () -> goAway(false);
shutdown = callback = new Callback.Completable();
} }
else else
{ {
@ -203,18 +199,10 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
if (sendGoAway) if (sendGoAway)
{ {
if (callback == null) Callback.Completable result = new Callback.Completable();
{ result.thenRun(this::tryRunZeroStreamsAction);
callback = new Callback.Completable(); writeControlFrame(frame, result);
callback.thenRun(this::tryRunZeroStreamsAction); return result;
writeControlFrame(frame, callback);
}
else
{
Callback writeCallback = Callback.from(Invocable.InvocationType.NON_BLOCKING, this::tryRunZeroStreamsAction, callback::failed);
writeControlFrame(frame, writeCallback);
}
return callback;
} }
else else
{ {
@ -229,6 +217,19 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
return new GoAwayFrame(lastId.get()); return new GoAwayFrame(lastId.get());
} }
public CompletableFuture<Void> shutdown()
{
CompletableFuture<Void> result;
try (AutoLock l = lock.lock())
{
if (shutdown != null)
return shutdown;
shutdown = result = new Callback.Completable();
}
goAway(true);
return result;
}
protected void updateLastId(long id) protected void updateLastId(long id)
{ {
Atomics.updateMax(lastId, id); Atomics.updateMax(lastId, id);
@ -629,6 +630,13 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
if (!confirmed) if (!confirmed)
return false; return false;
disconnect("idle_timeout");
return false;
}
public void disconnect(String reason)
{
GoAwayFrame goAwayFrame = null; GoAwayFrame goAwayFrame = null;
try (AutoLock l = lock.lock()) try (AutoLock l = lock.lock())
{ {
@ -646,7 +654,7 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
} }
case CLOSED: case CLOSED:
{ {
return false; return;
} }
default: default:
{ {
@ -655,14 +663,12 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
} }
} }
failStreams(stream -> true, "session_idle_timeout", true); failStreams(stream -> true, reason, true);
if (goAwayFrame != null) if (goAwayFrame != null)
writeControlFrame(goAwayFrame, Callback.from(() -> terminate("idle_timeout"))); writeControlFrame(goAwayFrame, Callback.from(() -> terminate(reason)));
else else
terminate("idle_timeout"); terminate(reason);
return false;
} }
private void failStreams(Predicate<HTTP3Stream> predicate, String reason, boolean close) private void failStreams(Predicate<HTTP3Stream> predicate, String reason, boolean close)
@ -690,12 +696,19 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
// Since the close() above is called by the // Since the close() above is called by the
// implementation, notify the application. // implementation, notify the application.
notifyDisconnect(); notifyDisconnect();
// Notify the shutdown completable.
CompletableFuture<Void> shutdown;
try (AutoLock l = lock.lock())
{
shutdown = this.shutdown;
}
if (shutdown != null)
shutdown.complete(null);
} }
private void tryRunZeroStreamsAction() private void tryRunZeroStreamsAction()
{ {
Runnable action = null; Runnable action = null;
CompletableFuture<Void> completable;
try (AutoLock l = lock.lock()) try (AutoLock l = lock.lock())
{ {
long count = streamCount.get(); long count = streamCount.get();
@ -706,8 +719,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
return; return;
} }
completable = shutdown;
switch (closeState) switch (closeState)
{ {
case LOCALLY_CLOSED: case LOCALLY_CLOSED:
@ -753,9 +764,6 @@ public abstract class HTTP3Session extends ContainerLifeCycle implements Session
LOG.debug("executing zero streams action on {}", this); LOG.debug("executing zero streams action on {}", this);
action.run(); action.run();
} }
if (completable != null)
completable.complete(null);
} }
public void onClose(long error, String reason) public void onClose(long error, String reason)

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.server.internal;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
@ -193,8 +194,13 @@ public class ServerHTTP3Session extends ServerProtocolSession
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("inward closing 0x{}/{} on {}", Long.toHexString(error), reason, this); LOG.debug("inward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
// TODO: maybe we should be harsher here... like halt() see onIdleTimeout() session.disconnect(reason);
session.goAway(false); }
@Override
public CompletableFuture<Void> shutdown()
{
return session.shutdown();
} }
@Override @Override

View File

@ -13,10 +13,12 @@
package org.eclipse.jetty.http3.tests; package org.eclipse.jetty.http3.tests;
import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
@ -31,12 +33,12 @@ import org.eclipse.jetty.http3.frames.GoAwayFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.frames.SettingsFrame; import org.eclipse.jetty.http3.frames.SettingsFrame;
import org.eclipse.jetty.http3.internal.HTTP3Session; import org.eclipse.jetty.http3.internal.HTTP3Session;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.http3.server.internal.HTTP3SessionServer; import org.eclipse.jetty.http3.server.internal.HTTP3SessionServer;
import org.eclipse.jetty.quic.client.ClientQuicSession; import org.eclipse.jetty.quic.client.ClientQuicSession;
import org.eclipse.jetty.quic.common.QuicConnection; import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.quic.server.ServerQuicSession; import org.eclipse.jetty.quic.server.ServerQuicSession;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await; import static org.awaitility.Awaitility.await;
@ -656,7 +658,7 @@ public class GoAwayTest extends AbstractClientServerTest
} }
@Test @Test
public void testClientShutdownServerCloses() throws Exception public void testClientDisconnectServerCloses() throws Exception
{ {
AtomicReference<Session> serverSessionRef = new AtomicReference<>(); AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch settingsLatch = new CountDownLatch(2); CountDownLatch settingsLatch = new CountDownLatch(2);
@ -695,7 +697,7 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS)); assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
// Issue a network close. // Issue a network disconnection.
clientSession.getProtocolSession().getQuicSession().getQuicConnection().close(); clientSession.getProtocolSession().getQuicSession().getQuicConnection().close();
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS)); assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
@ -706,7 +708,7 @@ public class GoAwayTest extends AbstractClientServerTest
} }
@Test @Test
public void testServerGracefulGoAwayClientShutdownServerCloses() throws Exception public void testServerGracefulGoAwayClientDisconnectServerCloses() throws Exception
{ {
AtomicReference<Session> serverSessionRef = new AtomicReference<>(); AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch settingsLatch = new CountDownLatch(2); CountDownLatch settingsLatch = new CountDownLatch(2);
@ -739,7 +741,7 @@ public class GoAwayTest extends AbstractClientServerTest
@Override @Override
public void onGoAway(Session session, GoAwayFrame frame) public void onGoAway(Session session, GoAwayFrame frame)
{ {
// Reply to the graceful GOAWAY from the server with a network close. // Reply to the graceful GOAWAY from the server with a network disconnection.
((HTTP3Session)session).getProtocolSession().getQuicSession().getQuicConnection().close(); ((HTTP3Session)session).getProtocolSession().getQuicSession().getQuicConnection().close();
} }
@ -762,6 +764,67 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(((HTTP3Session)clientSession).isClosed()); assertTrue(((HTTP3Session)clientSession).isClosed());
} }
@Test
public void testClientIdleTimeout() throws Exception
{
long idleTimeout = 1000;
AtomicReference<HTTP3Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public void onAccept(Session session)
{
serverSessionRef.set((HTTP3Session)session);
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
serverGoAwayLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
serverDisconnectLatch.countDown();
}
});
client.getClientConnector().setIdleTimeout(Duration.ofMillis(idleTimeout));
CountDownLatch clientIdleTimeoutLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
HTTP3Session clientSession = (HTTP3Session)newSession(new Session.Client.Listener()
{
@Override
public boolean onIdleTimeout(Session session)
{
clientIdleTimeoutLatch.countDown();
return true;
}
@Override
public void onDisconnect(Session session)
{
clientDisconnectLatch.countDown();
}
});
assertTrue(clientIdleTimeoutLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
// Client should send a GOAWAY to the server, which should reply.
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
HTTP3Session serverSession = serverSessionRef.get();
assertTrue(serverSession.isClosed());
assertTrue(clientSession.isClosed());
await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getProtocolSession().getQuicSession().getQuicConnection().getEndPoint().isOpen(), is(false));
}
@Test @Test
public void testServerIdleTimeout() throws Exception public void testServerIdleTimeout() throws Exception
{ {
@ -769,7 +832,6 @@ public class GoAwayTest extends AbstractClientServerTest
AtomicReference<Session> serverSessionRef = new AtomicReference<>(); AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1); CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1);
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1); CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener() start(new Session.Server.Listener()
{ {
@ -786,12 +848,6 @@ public class GoAwayTest extends AbstractClientServerTest
return true; return true;
} }
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
serverGoAwayLatch.countDown();
}
@Override @Override
public void onDisconnect(Session session) public void onDisconnect(Session session)
{ {
@ -1004,7 +1060,7 @@ public class GoAwayTest extends AbstractClientServerTest
} }
@Test @Test
public void testServerGoAwayWithStreamsThenShutdown() throws Exception public void testServerGoAwayWithStreamsThenDisconnect() throws Exception
{ {
AtomicReference<Session> serverSessionRef = new AtomicReference<>(); AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverGoAwayLatch = new CountDownLatch(1); CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
@ -1063,7 +1119,7 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
// Neither the client nor the server are finishing // Neither the client nor the server are finishing
// the pending stream, so force the close on the server. // the pending stream, so force the disconnect on the server.
HTTP3Session serverSession = (HTTP3Session)serverSessionRef.get(); HTTP3Session serverSession = (HTTP3Session)serverSessionRef.get();
serverSession.getProtocolSession().getQuicSession().getQuicConnection().close(); serverSession.getProtocolSession().getQuicSession().getQuicConnection().close();
@ -1075,4 +1131,224 @@ public class GoAwayTest extends AbstractClientServerTest
assertTrue(serverSession.isClosed()); assertTrue(serverSession.isClosed());
assertTrue(clientSession.isClosed()); assertTrue(clientSession.isClosed());
} }
@Test
public void testClientStop() throws Exception
{
CountDownLatch settingsLatch = new CountDownLatch(2);
CountDownLatch serverGoAwayLatch = new CountDownLatch(1);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
serverGoAwayLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
HTTP3Session clientSession = (HTTP3Session)newSession(new Session.Client.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
clientDisconnectLatch.countDown();
}
});
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
client.stop();
assertTrue(serverGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getProtocolSession().getQuicSession().getQuicConnection().getEndPoint().isOpen(), is(false));
}
@Test
public void testServerStop() throws Exception
{
AtomicReference<HTTP3Session> serverSessionRef = new AtomicReference<>();
CountDownLatch settingsLatch = new CountDownLatch(2);
CountDownLatch serverDisconnectLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
serverSessionRef.set((HTTP3Session)session);
settingsLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
serverDisconnectLatch.countDown();
}
});
CountDownLatch clientGoAwayLatch = new CountDownLatch(1);
CountDownLatch clientDisconnectLatch = new CountDownLatch(1);
HTTP3Session clientSession = (HTTP3Session)newSession(new Session.Client.Listener()
{
@Override
public void onSettings(Session session, SettingsFrame frame)
{
settingsLatch.countDown();
}
@Override
public void onGoAway(Session session, GoAwayFrame frame)
{
clientGoAwayLatch.countDown();
}
@Override
public void onDisconnect(Session session)
{
clientDisconnectLatch.countDown();
}
});
assertTrue(settingsLatch.await(5, TimeUnit.SECONDS));
server.stop();
assertTrue(clientGoAwayLatch.await(5, TimeUnit.SECONDS));
assertTrue(clientDisconnectLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverDisconnectLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(() -> serverSessionRef.get().getProtocolSession().getQuicSession().getQuicConnection().getEndPoint().isOpen(), is(false));
await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getProtocolSession().getQuicSession().getQuicConnection().getEndPoint().isOpen(), is(false));
}
@Test
public void testClientShutdown() throws Exception
{
AtomicReference<HTTP3Stream> serverStreamRef = new AtomicReference<>();
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
serverStreamRef.set((HTTP3Stream)stream);
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
return null;
}
});
CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener() {});
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
responseLatch.countDown();
stream.demand();
}
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
if (data != null)
{
data.complete();
if (data.isLast())
dataLatch.countDown();
}
stream.demand();
}
});
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
CompletableFuture<Void> shutdown = client.shutdown();
// Shutdown must not complete yet.
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));
// Complete the response.
serverStreamRef.get().data(new DataFrame(BufferUtil.EMPTY_BUFFER, true));
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
shutdown.get(5, TimeUnit.SECONDS);
}
@Test
public void testServerShutdown() throws Exception
{
AtomicReference<HTTP3Stream> serverStreamRef = new AtomicReference<>();
start(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
serverStreamRef.set((HTTP3Stream)stream);
stream.respond(new HeadersFrame(new MetaData.Response(HttpVersion.HTTP_3, HttpStatus.OK_200, HttpFields.EMPTY), false));
return null;
}
});
CountDownLatch responseLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
HTTP3SessionClient clientSession = (HTTP3SessionClient)newSession(new Session.Client.Listener() {});
clientSession.newRequest(new HeadersFrame(newRequest("/"), true), new Stream.Listener()
{
@Override
public void onResponse(Stream stream, HeadersFrame frame)
{
responseLatch.countDown();
stream.demand();
}
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
if (data != null)
{
data.complete();
if (data.isLast())
dataLatch.countDown();
}
stream.demand();
}
});
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
CompletableFuture<Void> shutdown = connector.shutdown();
// Shutdown must not complete yet.
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));
// Complete the response.
serverStreamRef.get().data(new DataFrame(BufferUtil.EMPTY_BUFFER, true));
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
shutdown.get(5, TimeUnit.SECONDS);
}
} }

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.quic.common;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -130,7 +131,7 @@ public abstract class ProtocolSession extends ContainerLifeCycle
public void inwardClose(long error, String reason) public void inwardClose(long error, String reason)
{ {
getQuicSession().outwardClose(error, reason); outwardClose(error, reason);
} }
public void outwardClose(long error, String reason) public void outwardClose(long error, String reason)
@ -138,6 +139,12 @@ public abstract class ProtocolSession extends ContainerLifeCycle
getQuicSession().outwardClose(error, reason); getQuicSession().outwardClose(error, reason);
} }
public CompletableFuture<Void> shutdown()
{
outwardClose(0x0, "shutdown");
return CompletableFuture.completedFuture(null);
}
protected abstract void onClose(long error, String reason); protected abstract void onClose(long error, String reason);
@Override @Override

View File

@ -16,12 +16,11 @@ package org.eclipse.jetty.quic.common;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.EventListener; import java.util.EventListener;
import java.util.List; import java.util.List;
import java.util.Queue; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -40,10 +39,7 @@ import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -62,8 +58,6 @@ public abstract class QuicSession extends ContainerLifeCycle
private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class); private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class);
private final AtomicLong[] ids = new AtomicLong[StreamType.values().length]; private final AtomicLong[] ids = new AtomicLong[StreamType.values().length];
private final AutoLock strategyQueueLock = new AutoLock();
private final Queue<Runnable> strategyQueue = new ArrayDeque<>();
private final ConcurrentMap<Long, QuicStreamEndPoint> endPoints = new ConcurrentHashMap<>(); private final ConcurrentMap<Long, QuicStreamEndPoint> endPoints = new ConcurrentHashMap<>();
private final Executor executor; private final Executor executor;
private final Scheduler scheduler; private final Scheduler scheduler;
@ -71,9 +65,8 @@ public abstract class QuicSession extends ContainerLifeCycle
private final QuicheConnection quicheConnection; private final QuicheConnection quicheConnection;
private final QuicConnection connection; private final QuicConnection connection;
private final Flusher flusher; private final Flusher flusher;
private final ExecutionStrategy strategy;
private SocketAddress remoteAddress; private SocketAddress remoteAddress;
private ProtocolSession protocolSession; private volatile ProtocolSession protocolSession;
private QuicheConnectionId quicheConnectionId; private QuicheConnectionId quicheConnectionId;
private long idleTimeout; private long idleTimeout;
@ -86,8 +79,6 @@ public abstract class QuicSession extends ContainerLifeCycle
this.connection = connection; this.connection = connection;
this.flusher = new Flusher(scheduler); this.flusher = new Flusher(scheduler);
addBean(flusher); addBean(flusher);
this.strategy = new AdaptiveExecutionStrategy(new Producer(), executor);
addBean(strategy);
this.remoteAddress = remoteAddress; this.remoteAddress = remoteAddress;
Arrays.setAll(ids, i -> new AtomicLong()); Arrays.setAll(ids, i -> new AtomicLong());
} }
@ -136,6 +127,14 @@ public abstract class QuicSession extends ContainerLifeCycle
} }
} }
public CompletableFuture<Void> shutdown()
{
ProtocolSession session = this.protocolSession;
if (session != null)
return session.shutdown();
return CompletableFuture.completedFuture(null);
}
public Executor getExecutor() public Executor getExecutor()
{ {
return executor; return executor;
@ -323,12 +322,13 @@ public abstract class QuicSession extends ContainerLifeCycle
// H3ProtoSession - QpackDecoder // H3ProtoSession - QpackDecoder
// H3ProtoSession -* request streams // H3ProtoSession -* request streams
if (protocolSession == null) ProtocolSession session = protocolSession;
if (session == null)
{ {
protocolSession = createProtocolSession(); protocolSession = session = createProtocolSession();
addManaged(protocolSession); addManaged(session);
} }
protocolSession.process(); session.process();
} }
else else
{ {
@ -355,15 +355,6 @@ public abstract class QuicSession extends ContainerLifeCycle
public abstract Connection newConnection(QuicStreamEndPoint endPoint); public abstract Connection newConnection(QuicStreamEndPoint endPoint);
private void dispatch(Runnable runnable)
{
try (AutoLock l = strategyQueueLock.lock())
{
strategyQueue.offer(runnable);
}
strategy.dispatch();
}
public void flush() public void flush()
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -397,6 +388,7 @@ public abstract class QuicSession extends ContainerLifeCycle
public void inwardClose(long error, String reason) public void inwardClose(long error, String reason)
{ {
protocolSession.inwardClose(error, reason); protocolSession.inwardClose(error, reason);
flush();
} }
public void outwardClose(long error, String reason) public void outwardClose(long error, String reason)
@ -404,8 +396,7 @@ public abstract class QuicSession extends ContainerLifeCycle
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("outward closing 0x{}/{} on {}", Long.toHexString(error), reason, this); LOG.debug("outward closing 0x{}/{} on {}", Long.toHexString(error), reason, this);
quicheConnection.close(error, reason); quicheConnection.close(error, reason);
// Flushing will eventually forward // Flushing will eventually forward the outward close to the connection.
// the outward close to the connection.
flush(); flush();
} }
@ -454,7 +445,7 @@ public abstract class QuicSession extends ContainerLifeCycle
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("re-iterating after quiche timeout {}", QuicSession.this); LOG.debug("re-iterating after quiche timeout {}", QuicSession.this);
// Do not use the timer thread to iterate. // Do not use the timer thread to iterate.
dispatch(() -> iterate()); getExecutor().execute(() -> iterate());
} }
}; };
} }
@ -530,18 +521,6 @@ public abstract class QuicSession extends ContainerLifeCycle
} }
} }
private class Producer implements ExecutionStrategy.Producer
{
@Override
public Runnable produce()
{
try (AutoLock l = strategyQueueLock.lock())
{
return strategyQueue.poll();
}
}
}
public interface Listener extends EventListener public interface Listener extends EventListener
{ {
public default void onOpened(QuicSession session) public default void onOpened(QuicSession session)

View File

@ -15,15 +15,19 @@ package org.eclipse.jetty.quic.common;
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.component.Graceful;
public class QuicSessionContainer extends AbstractLifeCycle implements QuicSession.Listener, Dumpable public class QuicSessionContainer extends AbstractLifeCycle implements QuicSession.Listener, Graceful, Dumpable
{ {
private final Set<QuicSession> sessions = ConcurrentHashMap.newKeySet(); private final Set<QuicSession> sessions = ConcurrentHashMap.newKeySet();
private final AtomicReference<CompletableFuture<Void>> shutdown = new AtomicReference<>();
@Override @Override
public void onOpened(QuicSession session) public void onOpened(QuicSession session)
@ -37,6 +41,35 @@ public class QuicSessionContainer extends AbstractLifeCycle implements QuicSessi
sessions.remove(session); sessions.remove(session);
} }
@Override
public CompletableFuture<Void> shutdown()
{
CompletableFuture<Void> result = new CompletableFuture<>();
CompletableFuture<Void> existing = shutdown.compareAndExchange(null, result);
if (existing == null)
{
CompletableFuture.allOf(sessions.stream().map(QuicSession::shutdown).toArray(CompletableFuture[]::new))
.whenComplete((v, x) ->
{
if (x == null)
result.complete(v);
else
result.completeExceptionally(x);
});
return result;
}
else
{
return existing;
}
}
@Override
public boolean isShutdown()
{
return shutdown.get() != null;
}
@Override @Override
public String dump() public String dump()
{ {

View File

@ -21,6 +21,7 @@ import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.util.EventListener; import java.util.EventListener;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
@ -199,23 +200,21 @@ public class QuicServerConnector extends AbstractNetworkConnector
@Override @Override
protected void doStop() throws Exception protected void doStop() throws Exception
{ {
// We want the DatagramChannel to be stopped by the SelectorManager.
super.doStop(); super.doStop();
removeBean(datagramChannel);
datagramChannel = null;
localPort = -2;
for (EventListener l : getBeans(EventListener.class)) for (EventListener l : getBeans(EventListener.class))
selectorManager.removeEventListener(l); selectorManager.removeEventListener(l);
} }
@Override @Override
public void close() public CompletableFuture<Void> shutdown()
{ {
super.close(); return container.shutdown();
DatagramChannel datagramChannel = this.datagramChannel;
this.datagramChannel = null;
if (datagramChannel != null)
{
removeBean(datagramChannel);
IO.close(datagramChannel);
}
localPort = -2;
} }
@Override @Override