diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java index aa1ad88a30d..13706afe443 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/HTTP3SessionClient.java @@ -66,6 +66,8 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client if (LOG.isDebugEnabled()) LOG.debug("received response {}#{} on {}", frame, streamId, this); stream.processResponse(frame); + if (frame.isLast()) + removeStream(stream); } else { diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java index 26e3677e689..f2f3d472e08 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Session.java @@ -14,6 +14,8 @@ package org.eclipse.jetty.http3.api; import java.net.SocketAddress; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -57,6 +59,14 @@ public interface Session return false; } + /** + * @return a snapshot of all the streams currently belonging to this session + */ + public default Collection getStreams() + { + return Collections.emptyList(); + } + /** *

The client-side HTTP/3 API representing a connection with a server.

*

Once a {@link Session} has been obtained, it can be used to make HTTP/3 requests:

diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java index b7e298a6a7d..ab16e35a5b2 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Session.java @@ -14,13 +14,16 @@ package org.eclipse.jetty.http3.internal; import java.net.SocketAddress; +import java.util.Collection; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.api.Session; +import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.HeadersFrame; @@ -86,6 +89,12 @@ public abstract class HTTP3Session implements Session, ParserListener return closeState != CloseState.NOT_CLOSED; } + @Override + public Collection getStreams() + { + return List.copyOf(streams.values()); + } + public void close(long error, String reason) { getProtocolSession().close(error, reason); @@ -109,10 +118,12 @@ public abstract class HTTP3Session implements Session, ParserListener protected HTTP3Stream createStream(QuicStreamEndPoint endPoint) { long streamId = endPoint.getStreamId(); - HTTP3Stream stream = newHTTP3Stream(endPoint); - if (streams.put(streamId, stream) != null) - throw new IllegalStateException("duplicate stream id " + streamId); - return stream; + return streams.compute(streamId, (id, stream) -> + { + if (stream != null) + throw new IllegalStateException("duplicate stream id " + streamId); + return newHTTP3Stream(endPoint); + }); } protected HTTP3Stream getOrCreateStream(QuicStreamEndPoint endPoint) @@ -130,6 +141,8 @@ public abstract class HTTP3Session implements Session, ParserListener if (idleTimeout > 0) stream.setIdleTimeout(idleTimeout); } + if (LOG.isDebugEnabled()) + LOG.debug("created {} on {}", stream, this); return stream; } @@ -138,6 +151,16 @@ public abstract class HTTP3Session implements Session, ParserListener return streams.get(streamId); } + public void removeStream(HTTP3Stream stream) + { + boolean removed = streams.remove(stream.getId()) != null; + if (removed) + { + if (LOG.isDebugEnabled()) + LOG.debug("destroyed {} on {}", stream, this); + } + } + public abstract void writeFrame(long streamId, Frame frame, Callback callback); public Map onPreface() @@ -196,6 +219,7 @@ public abstract class HTTP3Session implements Session, ParserListener if (LOG.isDebugEnabled()) LOG.debug("received trailer {}#{} on {}", frame, streamId, this); stream.processTrailer(frame); + removeStream(stream); } } @@ -232,7 +256,10 @@ public abstract class HTTP3Session implements Session, ParserListener LOG.debug("stream failure {}/{} for stream #{} on {}", error, failure, streamId, this, failure); HTTP3Stream stream = getStream(streamId); if (stream != null) + { stream.processFailure(error, failure); + removeStream(stream); + } } @Override @@ -282,7 +309,9 @@ public abstract class HTTP3Session implements Session, ParserListener @Override protected boolean onExpired(HTTP3Stream stream) { - stream.processIdleTimeout(new TimeoutException("idle timeout " + stream.getIdleTimeout() + " ms elapsed")); + if (stream.processIdleTimeout(new TimeoutException("idle timeout " + stream.getIdleTimeout() + " ms elapsed"))) + removeStream(stream); + // The iterator returned from the method above does not support removal. return false; } } diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java index 291dda193b8..afffa6b3687 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3Stream.java @@ -97,12 +97,14 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable expireNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(idleTimeout); } - void processIdleTimeout(TimeoutException timeout) + boolean processIdleTimeout(TimeoutException timeout) { if (LOG.isDebugEnabled()) LOG.debug("idle timeout {} ms expired on {}", getIdleTimeout(), this); - if (notifyIdleTimeout(timeout)) + boolean close = notifyIdleTimeout(timeout); + if (close) endPoint.close(ErrorCode.REQUEST_CANCELLED_ERROR.code(), timeout); + return close; } @Override @@ -120,8 +122,16 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable @Override public Data readData() { - HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection(); - return connection.readData(); + try + { + HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection(); + return connection.readData(); + } + catch (Throwable x) + { + session.removeStream(this); + throw x; + } } @Override diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java index a9f4adaeca2..c9d4aee480d 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HTTP3SessionServer.java @@ -63,6 +63,8 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server if (LOG.isDebugEnabled()) LOG.debug("received request {}#{} on {}", frame, streamId, this); stream.processRequest(frame); + if (frame.isLast()) + removeStream(stream); } else { diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/StreamIdleTimeoutTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/StreamIdleTimeoutTest.java index eca4c75216a..4c0739d3482 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/StreamIdleTimeoutTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/StreamIdleTimeoutTest.java @@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.tests; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; @@ -29,6 +30,7 @@ import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.server.AbstractHTTP3ServerConnectionFactory; import org.junit.jupiter.api.Test; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -39,9 +41,16 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest @Test public void testClientStreamIdleTimeout() throws Exception { + AtomicReference serverSessionRef = new AtomicReference<>(); CountDownLatch serverLatch = new CountDownLatch(1); startServer(new Session.Server.Listener() { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + } + @Override public Stream.Listener onRequest(Stream stream, HeadersFrame frame) { @@ -93,13 +102,13 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest long streamIdleTimeout = 1000; client.setStreamIdleTimeout(streamIdleTimeout); - Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + Session.Client clientSession = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) .get(5, TimeUnit.SECONDS); CountDownLatch clientIdleLatch = new CountDownLatch(1); HttpURI uri1 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/idle"); MetaData.Request request1 = new MetaData.Request(HttpMethod.GET.asString(), uri1, HttpVersion.HTTP_3, HttpFields.EMPTY); - session.newRequest(new HeadersFrame(request1, false), new Stream.Listener() + clientSession.newRequest(new HeadersFrame(request1, false), new Stream.Listener() { @Override public boolean onIdleTimeout(Stream stream, Throwable failure) @@ -114,11 +123,14 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest assertTrue(clientIdleLatch.await(2 * streamIdleTimeout, TimeUnit.MILLISECONDS)); assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); + await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getStreams().isEmpty()); + await().atMost(1, TimeUnit.SECONDS).until(() -> serverSessionRef.get().getStreams().isEmpty()); + // The session should still be open, verify by sending another request. CountDownLatch clientLatch = new CountDownLatch(1); HttpURI uri2 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); MetaData.Request request2 = new MetaData.Request(HttpMethod.GET.asString(), uri2, HttpVersion.HTTP_3, HttpFields.EMPTY); - session.newRequest(new HeadersFrame(request2, true), new Stream.Listener() + clientSession.newRequest(new HeadersFrame(request2, true), new Stream.Listener() { @Override public void onResponse(Stream stream, HeadersFrame frame) @@ -128,15 +140,25 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest }); assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + + await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getStreams().isEmpty()); + await().atMost(1, TimeUnit.SECONDS).until(() -> serverSessionRef.get().getStreams().isEmpty()); } @Test public void testServerStreamIdleTimeout() throws Exception { + AtomicReference serverSessionRef = new AtomicReference<>(); long idleTimeout = 1000; CountDownLatch serverIdleLatch = new CountDownLatch(1); startServer(new Session.Server.Listener() { + @Override + public void onAccept(Session session) + { + serverSessionRef.set(session); + } + @Override public Stream.Listener onRequest(Stream stream, HeadersFrame frame) { @@ -166,13 +188,13 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest h3.setStreamIdleTimeout(idleTimeout); startClient(); - Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + Session.Client clientSession = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) .get(5, TimeUnit.SECONDS); CountDownLatch clientFailureLatch = new CountDownLatch(1); HttpURI uri1 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/idle"); MetaData.Request request1 = new MetaData.Request(HttpMethod.GET.asString(), uri1, HttpVersion.HTTP_3, HttpFields.EMPTY); - session.newRequest(new HeadersFrame(request1, false), new Stream.Listener() + clientSession.newRequest(new HeadersFrame(request1, false), new Stream.Listener() { @Override public void onFailure(long error, Throwable failure) @@ -187,11 +209,14 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest assertTrue(serverIdleLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); assertTrue(clientFailureLatch.await(5, TimeUnit.SECONDS)); + await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getStreams().isEmpty()); + await().atMost(1, TimeUnit.SECONDS).until(() -> serverSessionRef.get().getStreams().isEmpty()); + // The session should still be open, verify by sending another request. CountDownLatch clientLatch = new CountDownLatch(1); HttpURI uri2 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); MetaData.Request request2 = new MetaData.Request(HttpMethod.GET.asString(), uri2, HttpVersion.HTTP_3, HttpFields.EMPTY); - session.newRequest(new HeadersFrame(request2, true), new Stream.Listener() + clientSession.newRequest(new HeadersFrame(request2, true), new Stream.Listener() { @Override public void onResponse(Stream stream, HeadersFrame frame) @@ -201,5 +226,8 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest }); assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); + + await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getStreams().isEmpty()); + await().atMost(1, TimeUnit.SECONDS).until(() -> serverSessionRef.get().getStreams().isEmpty()); } }