Issue #6728 - QUIC and HTTP/3

- Avoid leaking stream instances.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-01 22:32:58 +02:00
parent c8107539df
commit a44984e6e4
6 changed files with 96 additions and 15 deletions

View File

@ -66,6 +66,8 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("received response {}#{} on {}", frame, streamId, this); LOG.debug("received response {}#{} on {}", frame, streamId, this);
stream.processResponse(frame); stream.processResponse(frame);
if (frame.isLast())
removeStream(stream);
} }
else else
{ {

View File

@ -14,6 +14,8 @@
package org.eclipse.jetty.http3.api; package org.eclipse.jetty.http3.api;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -57,6 +59,14 @@ public interface Session
return false; return false;
} }
/**
* @return a snapshot of all the streams currently belonging to this session
*/
public default Collection<Stream> getStreams()
{
return Collections.emptyList();
}
/** /**
* <p>The client-side HTTP/3 API representing a connection with a server.</p> * <p>The client-side HTTP/3 API representing a connection with a server.</p>
* <p>Once a {@link Session} has been obtained, it can be used to make HTTP/3 requests:</p> * <p>Once a {@link Session} has been obtained, it can be used to make HTTP/3 requests:</p>

View File

@ -14,13 +14,16 @@
package org.eclipse.jetty.http3.internal; package org.eclipse.jetty.http3.internal;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame; import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.Frame; import org.eclipse.jetty.http3.frames.Frame;
import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.http3.frames.HeadersFrame;
@ -86,6 +89,12 @@ public abstract class HTTP3Session implements Session, ParserListener
return closeState != CloseState.NOT_CLOSED; return closeState != CloseState.NOT_CLOSED;
} }
@Override
public Collection<Stream> getStreams()
{
return List.copyOf(streams.values());
}
public void close(long error, String reason) public void close(long error, String reason)
{ {
getProtocolSession().close(error, reason); getProtocolSession().close(error, reason);
@ -109,10 +118,12 @@ public abstract class HTTP3Session implements Session, ParserListener
protected HTTP3Stream createStream(QuicStreamEndPoint endPoint) protected HTTP3Stream createStream(QuicStreamEndPoint endPoint)
{ {
long streamId = endPoint.getStreamId(); long streamId = endPoint.getStreamId();
HTTP3Stream stream = newHTTP3Stream(endPoint); return streams.compute(streamId, (id, stream) ->
if (streams.put(streamId, stream) != null) {
throw new IllegalStateException("duplicate stream id " + streamId); if (stream != null)
return stream; throw new IllegalStateException("duplicate stream id " + streamId);
return newHTTP3Stream(endPoint);
});
} }
protected HTTP3Stream getOrCreateStream(QuicStreamEndPoint endPoint) protected HTTP3Stream getOrCreateStream(QuicStreamEndPoint endPoint)
@ -130,6 +141,8 @@ public abstract class HTTP3Session implements Session, ParserListener
if (idleTimeout > 0) if (idleTimeout > 0)
stream.setIdleTimeout(idleTimeout); stream.setIdleTimeout(idleTimeout);
} }
if (LOG.isDebugEnabled())
LOG.debug("created {} on {}", stream, this);
return stream; return stream;
} }
@ -138,6 +151,16 @@ public abstract class HTTP3Session implements Session, ParserListener
return streams.get(streamId); return streams.get(streamId);
} }
public void removeStream(HTTP3Stream stream)
{
boolean removed = streams.remove(stream.getId()) != null;
if (removed)
{
if (LOG.isDebugEnabled())
LOG.debug("destroyed {} on {}", stream, this);
}
}
public abstract void writeFrame(long streamId, Frame frame, Callback callback); public abstract void writeFrame(long streamId, Frame frame, Callback callback);
public Map<Long, Long> onPreface() public Map<Long, Long> onPreface()
@ -196,6 +219,7 @@ public abstract class HTTP3Session implements Session, ParserListener
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("received trailer {}#{} on {}", frame, streamId, this); LOG.debug("received trailer {}#{} on {}", frame, streamId, this);
stream.processTrailer(frame); stream.processTrailer(frame);
removeStream(stream);
} }
} }
@ -232,7 +256,10 @@ public abstract class HTTP3Session implements Session, ParserListener
LOG.debug("stream failure {}/{} for stream #{} on {}", error, failure, streamId, this, failure); LOG.debug("stream failure {}/{} for stream #{} on {}", error, failure, streamId, this, failure);
HTTP3Stream stream = getStream(streamId); HTTP3Stream stream = getStream(streamId);
if (stream != null) if (stream != null)
{
stream.processFailure(error, failure); stream.processFailure(error, failure);
removeStream(stream);
}
} }
@Override @Override
@ -282,7 +309,9 @@ public abstract class HTTP3Session implements Session, ParserListener
@Override @Override
protected boolean onExpired(HTTP3Stream stream) protected boolean onExpired(HTTP3Stream stream)
{ {
stream.processIdleTimeout(new TimeoutException("idle timeout " + stream.getIdleTimeout() + " ms elapsed")); if (stream.processIdleTimeout(new TimeoutException("idle timeout " + stream.getIdleTimeout() + " ms elapsed")))
removeStream(stream);
// The iterator returned from the method above does not support removal.
return false; return false;
} }
} }

View File

@ -97,12 +97,14 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
expireNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(idleTimeout); expireNanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(idleTimeout);
} }
void processIdleTimeout(TimeoutException timeout) boolean processIdleTimeout(TimeoutException timeout)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("idle timeout {} ms expired on {}", getIdleTimeout(), this); LOG.debug("idle timeout {} ms expired on {}", getIdleTimeout(), this);
if (notifyIdleTimeout(timeout)) boolean close = notifyIdleTimeout(timeout);
if (close)
endPoint.close(ErrorCode.REQUEST_CANCELLED_ERROR.code(), timeout); endPoint.close(ErrorCode.REQUEST_CANCELLED_ERROR.code(), timeout);
return close;
} }
@Override @Override
@ -120,8 +122,16 @@ public class HTTP3Stream implements Stream, CyclicTimeouts.Expirable
@Override @Override
public Data readData() public Data readData()
{ {
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection(); try
return connection.readData(); {
HTTP3StreamConnection connection = (HTTP3StreamConnection)endPoint.getConnection();
return connection.readData();
}
catch (Throwable x)
{
session.removeStream(this);
throw x;
}
} }
@Override @Override

View File

@ -63,6 +63,8 @@ public class HTTP3SessionServer extends HTTP3Session implements Session.Server
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("received request {}#{} on {}", frame, streamId, this); LOG.debug("received request {}#{} on {}", frame, streamId, this);
stream.processRequest(frame); stream.processRequest(frame);
if (frame.isLast())
removeStream(stream);
} }
else else
{ {

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http3.tests;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpMethod;
@ -29,6 +30,7 @@ import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.server.AbstractHTTP3ServerConnectionFactory; import org.eclipse.jetty.http3.server.AbstractHTTP3ServerConnectionFactory;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -39,9 +41,16 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
@Test @Test
public void testClientStreamIdleTimeout() throws Exception public void testClientStreamIdleTimeout() throws Exception
{ {
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
CountDownLatch serverLatch = new CountDownLatch(1); CountDownLatch serverLatch = new CountDownLatch(1);
startServer(new Session.Server.Listener() startServer(new Session.Server.Listener()
{ {
@Override
public void onAccept(Session session)
{
serverSessionRef.set(session);
}
@Override @Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame) public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{ {
@ -93,13 +102,13 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
long streamIdleTimeout = 1000; long streamIdleTimeout = 1000;
client.setStreamIdleTimeout(streamIdleTimeout); client.setStreamIdleTimeout(streamIdleTimeout);
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) Session.Client clientSession = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS); .get(5, TimeUnit.SECONDS);
CountDownLatch clientIdleLatch = new CountDownLatch(1); CountDownLatch clientIdleLatch = new CountDownLatch(1);
HttpURI uri1 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/idle"); HttpURI uri1 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/idle");
MetaData.Request request1 = new MetaData.Request(HttpMethod.GET.asString(), uri1, HttpVersion.HTTP_3, HttpFields.EMPTY); MetaData.Request request1 = new MetaData.Request(HttpMethod.GET.asString(), uri1, HttpVersion.HTTP_3, HttpFields.EMPTY);
session.newRequest(new HeadersFrame(request1, false), new Stream.Listener() clientSession.newRequest(new HeadersFrame(request1, false), new Stream.Listener()
{ {
@Override @Override
public boolean onIdleTimeout(Stream stream, Throwable failure) public boolean onIdleTimeout(Stream stream, Throwable failure)
@ -114,11 +123,14 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
assertTrue(clientIdleLatch.await(2 * streamIdleTimeout, TimeUnit.MILLISECONDS)); assertTrue(clientIdleLatch.await(2 * streamIdleTimeout, TimeUnit.MILLISECONDS));
assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getStreams().isEmpty());
await().atMost(1, TimeUnit.SECONDS).until(() -> serverSessionRef.get().getStreams().isEmpty());
// The session should still be open, verify by sending another request. // The session should still be open, verify by sending another request.
CountDownLatch clientLatch = new CountDownLatch(1); CountDownLatch clientLatch = new CountDownLatch(1);
HttpURI uri2 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); HttpURI uri2 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request request2 = new MetaData.Request(HttpMethod.GET.asString(), uri2, HttpVersion.HTTP_3, HttpFields.EMPTY); MetaData.Request request2 = new MetaData.Request(HttpMethod.GET.asString(), uri2, HttpVersion.HTTP_3, HttpFields.EMPTY);
session.newRequest(new HeadersFrame(request2, true), new Stream.Listener() clientSession.newRequest(new HeadersFrame(request2, true), new Stream.Listener()
{ {
@Override @Override
public void onResponse(Stream stream, HeadersFrame frame) public void onResponse(Stream stream, HeadersFrame frame)
@ -128,15 +140,25 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
}); });
assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getStreams().isEmpty());
await().atMost(1, TimeUnit.SECONDS).until(() -> serverSessionRef.get().getStreams().isEmpty());
} }
@Test @Test
public void testServerStreamIdleTimeout() throws Exception public void testServerStreamIdleTimeout() throws Exception
{ {
AtomicReference<Session> serverSessionRef = new AtomicReference<>();
long idleTimeout = 1000; long idleTimeout = 1000;
CountDownLatch serverIdleLatch = new CountDownLatch(1); CountDownLatch serverIdleLatch = new CountDownLatch(1);
startServer(new Session.Server.Listener() startServer(new Session.Server.Listener()
{ {
@Override
public void onAccept(Session session)
{
serverSessionRef.set(session);
}
@Override @Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame) public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{ {
@ -166,13 +188,13 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
h3.setStreamIdleTimeout(idleTimeout); h3.setStreamIdleTimeout(idleTimeout);
startClient(); startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) Session.Client clientSession = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS); .get(5, TimeUnit.SECONDS);
CountDownLatch clientFailureLatch = new CountDownLatch(1); CountDownLatch clientFailureLatch = new CountDownLatch(1);
HttpURI uri1 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/idle"); HttpURI uri1 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/idle");
MetaData.Request request1 = new MetaData.Request(HttpMethod.GET.asString(), uri1, HttpVersion.HTTP_3, HttpFields.EMPTY); MetaData.Request request1 = new MetaData.Request(HttpMethod.GET.asString(), uri1, HttpVersion.HTTP_3, HttpFields.EMPTY);
session.newRequest(new HeadersFrame(request1, false), new Stream.Listener() clientSession.newRequest(new HeadersFrame(request1, false), new Stream.Listener()
{ {
@Override @Override
public void onFailure(long error, Throwable failure) public void onFailure(long error, Throwable failure)
@ -187,11 +209,14 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
assertTrue(serverIdleLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); assertTrue(serverIdleLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(clientFailureLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientFailureLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getStreams().isEmpty());
await().atMost(1, TimeUnit.SECONDS).until(() -> serverSessionRef.get().getStreams().isEmpty());
// The session should still be open, verify by sending another request. // The session should still be open, verify by sending another request.
CountDownLatch clientLatch = new CountDownLatch(1); CountDownLatch clientLatch = new CountDownLatch(1);
HttpURI uri2 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); HttpURI uri2 = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request request2 = new MetaData.Request(HttpMethod.GET.asString(), uri2, HttpVersion.HTTP_3, HttpFields.EMPTY); MetaData.Request request2 = new MetaData.Request(HttpMethod.GET.asString(), uri2, HttpVersion.HTTP_3, HttpFields.EMPTY);
session.newRequest(new HeadersFrame(request2, true), new Stream.Listener() clientSession.newRequest(new HeadersFrame(request2, true), new Stream.Listener()
{ {
@Override @Override
public void onResponse(Stream stream, HeadersFrame frame) public void onResponse(Stream stream, HeadersFrame frame)
@ -201,5 +226,8 @@ public class StreamIdleTimeoutTest extends AbstractHTTP3ClientServerTest
}); });
assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
await().atMost(1, TimeUnit.SECONDS).until(() -> clientSession.getStreams().isEmpty());
await().atMost(1, TimeUnit.SECONDS).until(() -> serverSessionRef.get().getStreams().isEmpty());
} }
} }