Issue #6728 - QUIC and HTTP/3
- Implemented component hierarchy. - Implemented dumpability of components. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
0ea4b3f7c4
commit
392d59e9f7
|
@ -26,6 +26,7 @@ import org.eclipse.jetty.io.Connection;
|
|||
import org.eclipse.jetty.quic.client.ClientQuicConnection;
|
||||
import org.eclipse.jetty.quic.client.QuicClientConnectorConfigurator;
|
||||
import org.eclipse.jetty.quic.common.QuicConnection;
|
||||
import org.eclipse.jetty.quic.common.QuicSessionContainer;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
|
@ -46,6 +47,7 @@ public class HTTP3Client extends ContainerLifeCycle
|
|||
public static final String SESSION_PROMISE_CONTEXT_KEY = CLIENT_CONTEXT_KEY + ".promise";
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Client.class);
|
||||
|
||||
private final QuicSessionContainer container = new QuicSessionContainer();
|
||||
private final ClientConnector connector;
|
||||
private List<String> protocols = List.of("h3");
|
||||
private long streamIdleTimeout = 30000;
|
||||
|
@ -58,6 +60,12 @@ public class HTTP3Client extends ContainerLifeCycle
|
|||
{
|
||||
this.connector = new ClientConnector(new QuicClientConnectorConfigurator(this::configureConnection));
|
||||
addBean(connector);
|
||||
addBean(container);
|
||||
}
|
||||
|
||||
public ClientConnector getClientConnector()
|
||||
{
|
||||
return connector;
|
||||
}
|
||||
|
||||
public int getInputBufferSize()
|
||||
|
@ -146,6 +154,7 @@ public class HTTP3Client extends ContainerLifeCycle
|
|||
if (connection instanceof QuicConnection)
|
||||
{
|
||||
QuicConnection quicConnection = (QuicConnection)connection;
|
||||
quicConnection.addEventListener(container);
|
||||
quicConnection.setInputBufferSize(getInputBufferSize());
|
||||
quicConnection.setOutputBufferSize(getOutputBufferSize());
|
||||
quicConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
|
||||
|
|
|
@ -74,7 +74,6 @@ public class HTTP3ClientConnectionFactory implements ClientConnectionFactory, Pr
|
|||
@Override
|
||||
public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
|
||||
{
|
||||
// TODO: can the downcasts be removed?
|
||||
QuicStreamEndPoint streamEndPoint = (QuicStreamEndPoint)endPoint;
|
||||
long streamId = streamEndPoint.getStreamId();
|
||||
ClientHTTP3Session http3Session = (ClientHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
|
||||
|
|
|
@ -47,36 +47,41 @@ public class ClientHTTP3Session extends ClientProtocolSession
|
|||
private final ControlFlusher controlFlusher;
|
||||
private final HTTP3Flusher messageFlusher;
|
||||
|
||||
public ClientHTTP3Session(ClientQuicSession session, Session.Client.Listener listener, Promise<Session.Client> promise, int maxBlockedStreams, int maxResponseHeadersSize)
|
||||
public ClientHTTP3Session(ClientQuicSession quicSession, Session.Client.Listener listener, Promise<Session.Client> promise, int maxBlockedStreams, int maxResponseHeadersSize)
|
||||
{
|
||||
super(session);
|
||||
super(quicSession);
|
||||
this.session = new HTTP3SessionClient(this, listener, promise);
|
||||
addBean(session);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("initializing HTTP/3 streams");
|
||||
|
||||
long encoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
|
||||
QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId);
|
||||
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
|
||||
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
|
||||
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), maxBlockedStreams);
|
||||
addBean(encoder);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
|
||||
|
||||
long decoderStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
|
||||
QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId);
|
||||
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
|
||||
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
|
||||
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxResponseHeadersSize);
|
||||
addBean(decoder);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
|
||||
|
||||
long controlStreamId = getQuicSession().newStreamId(StreamType.CLIENT_UNIDIRECTIONAL);
|
||||
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
|
||||
this.controlFlusher = new ControlFlusher(session, controlEndPoint, true);
|
||||
this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, true);
|
||||
addBean(controlFlusher);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
|
||||
|
||||
// TODO: make parameters configurable.
|
||||
this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true);
|
||||
this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, 4096, true);
|
||||
addBean(messageFlusher);
|
||||
}
|
||||
|
||||
public QpackDecoder getQpackDecoder()
|
||||
|
@ -100,8 +105,9 @@ public class ClientHTTP3Session extends ClientProtocolSession
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onOpen()
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
super.doStart();
|
||||
// Queue the mandatory SETTINGS frame.
|
||||
Map<Long, Long> settings = session.onPreface();
|
||||
if (settings == null)
|
||||
|
|
|
@ -42,13 +42,15 @@ import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
|
|||
import org.eclipse.jetty.util.Atomics;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class HTTP3Session implements Session, ParserListener
|
||||
public abstract class HTTP3Session extends ContainerLifeCycle implements Session, ParserListener
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(HTTP3Session.class);
|
||||
|
||||
|
@ -56,7 +58,7 @@ public abstract class HTTP3Session implements Session, ParserListener
|
|||
private final AtomicLong lastId = new AtomicLong();
|
||||
private final Map<Long, HTTP3Stream> streams = new ConcurrentHashMap<>();
|
||||
private final ProtocolSession session;
|
||||
private final Listener listener;
|
||||
private final Session.Listener listener;
|
||||
private final AtomicInteger streamCount = new AtomicInteger();
|
||||
private final StreamTimeouts streamTimeouts;
|
||||
private long streamIdleTimeout;
|
||||
|
@ -66,7 +68,7 @@ public abstract class HTTP3Session implements Session, ParserListener
|
|||
private Runnable zeroStreamsAction;
|
||||
private Callback.Completable shutdown;
|
||||
|
||||
public HTTP3Session(ProtocolSession session, Listener listener)
|
||||
public HTTP3Session(ProtocolSession session, Session.Listener listener)
|
||||
{
|
||||
this.session = session;
|
||||
this.listener = listener;
|
||||
|
@ -78,7 +80,7 @@ public abstract class HTTP3Session implements Session, ParserListener
|
|||
return session;
|
||||
}
|
||||
|
||||
public Listener getListener()
|
||||
public Session.Listener getListener()
|
||||
{
|
||||
return listener;
|
||||
}
|
||||
|
@ -829,6 +831,12 @@ public abstract class HTTP3Session implements Session, ParserListener
|
|||
return closeState == CloseState.CLOSED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
dumpObjects(out, indent, new DumpableCollection("streams", getStreams()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -23,7 +23,6 @@ import org.eclipse.jetty.http3.qpack.internal.EncodableEntry;
|
|||
import org.eclipse.jetty.http3.qpack.internal.QpackContext;
|
||||
import org.eclipse.jetty.http3.qpack.internal.table.Entry;
|
||||
import org.eclipse.jetty.http3.qpack.internal.table.StaticTable;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
|
||||
public class QpackFieldPreEncoder implements HttpFieldPreEncoder
|
||||
{
|
||||
|
|
|
@ -113,7 +113,6 @@ public abstract class AbstractHTTP3ServerConnectionFactory extends AbstractConne
|
|||
@Override
|
||||
public Connection newConnection(Connector connector, EndPoint endPoint)
|
||||
{
|
||||
// TODO: can the downcasts be removed?
|
||||
QuicStreamEndPoint streamEndPoint = (QuicStreamEndPoint)endPoint;
|
||||
long streamId = streamEndPoint.getStreamId();
|
||||
ServerHTTP3Session http3Session = (ServerHTTP3Session)streamEndPoint.getQuicSession().getProtocolSession();
|
||||
|
|
|
@ -21,7 +21,6 @@ import org.eclipse.jetty.http3.internal.HTTP3Stream;
|
|||
import org.eclipse.jetty.http3.server.internal.ServerHTTP3Session;
|
||||
import org.eclipse.jetty.http3.server.internal.ServerHTTP3StreamConnection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.quic.common.ProtocolSession;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
|
||||
public class HTTP3ServerConnectionFactory extends AbstractHTTP3ServerConnectionFactory
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.eclipse.jetty.quic.common.StreamType;
|
|||
import org.eclipse.jetty.quic.server.ServerProtocolSession;
|
||||
import org.eclipse.jetty.quic.server.ServerQuicSession;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||
import org.eclipse.jetty.util.thread.Invocable;
|
||||
import org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy;
|
||||
|
@ -53,39 +52,44 @@ public class ServerHTTP3Session extends ServerProtocolSession
|
|||
private final AdaptiveExecutionStrategy strategy;
|
||||
private final HTTP3Producer producer = new HTTP3Producer();
|
||||
|
||||
public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize)
|
||||
public ServerHTTP3Session(ServerQuicSession quicSession, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize)
|
||||
{
|
||||
super(session);
|
||||
super(quicSession);
|
||||
this.session = new HTTP3SessionServer(this, listener);
|
||||
addBean(session);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("initializing HTTP/3 streams");
|
||||
|
||||
long encoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
|
||||
QuicStreamEndPoint encoderEndPoint = configureInstructionEndPoint(encoderStreamId);
|
||||
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(session, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
|
||||
InstructionFlusher encoderInstructionFlusher = new InstructionFlusher(quicSession, encoderEndPoint, EncoderStreamConnection.STREAM_TYPE);
|
||||
this.encoder = new QpackEncoder(new InstructionHandler(encoderInstructionFlusher), maxBlockedStreams);
|
||||
addBean(encoder);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created encoder stream #{} on {}", encoderStreamId, encoderEndPoint);
|
||||
|
||||
long decoderStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
|
||||
QuicStreamEndPoint decoderEndPoint = configureInstructionEndPoint(decoderStreamId);
|
||||
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(session, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
|
||||
InstructionFlusher decoderInstructionFlusher = new InstructionFlusher(quicSession, decoderEndPoint, DecoderStreamConnection.STREAM_TYPE);
|
||||
this.decoder = new QpackDecoder(new InstructionHandler(decoderInstructionFlusher), maxRequestHeadersSize);
|
||||
addBean(decoder);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created decoder stream #{} on {}", decoderStreamId, decoderEndPoint);
|
||||
|
||||
long controlStreamId = getQuicSession().newStreamId(StreamType.SERVER_UNIDIRECTIONAL);
|
||||
QuicStreamEndPoint controlEndPoint = configureControlEndPoint(controlStreamId);
|
||||
this.controlFlusher = new ControlFlusher(session, controlEndPoint, true);
|
||||
this.controlFlusher = new ControlFlusher(quicSession, controlEndPoint, true);
|
||||
addBean(controlFlusher);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("created control stream #{} on {}", controlStreamId, controlEndPoint);
|
||||
|
||||
// TODO: make parameters configurable.
|
||||
this.messageFlusher = new HTTP3Flusher(session.getByteBufferPool(), encoder, 4096, true);
|
||||
this.messageFlusher = new HTTP3Flusher(quicSession.getByteBufferPool(), encoder, 4096, true);
|
||||
addBean(messageFlusher);
|
||||
|
||||
this.strategy = new AdaptiveExecutionStrategy(producer, getQuicSession().getExecutor());
|
||||
// TODO: call addBean instead
|
||||
LifeCycle.start(strategy);
|
||||
addBean(strategy);
|
||||
}
|
||||
|
||||
public QpackDecoder getQpackDecoder()
|
||||
|
@ -127,8 +131,9 @@ public class ServerHTTP3Session extends ServerProtocolSession
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onOpen()
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
super.doStart();
|
||||
// Queue the mandatory SETTINGS frame.
|
||||
Map<Long, Long> settings = session.onPreface();
|
||||
if (settings == null)
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.eclipse.jetty.quic.client.ClientQuicSession;
|
|||
import org.eclipse.jetty.quic.common.QuicConnection;
|
||||
import org.eclipse.jetty.quic.server.ServerQuicSession;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
|
|
@ -35,8 +35,9 @@ public class ClientProtocolSession extends ProtocolSession
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onOpen()
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
super.doStart();
|
||||
// Create a single bidirectional, client-initiated,
|
||||
// QUIC stream that plays the role of the TCP stream.
|
||||
long streamId = getQuicSession().newStreamId(StreamType.CLIENT_BIDIRECTIONAL);
|
||||
|
|
|
@ -19,10 +19,11 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public abstract class ProtocolSession
|
||||
public abstract class ProtocolSession extends ContainerLifeCycle
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ProtocolSession.class);
|
||||
|
||||
|
@ -39,8 +40,6 @@ public abstract class ProtocolSession
|
|||
return session;
|
||||
}
|
||||
|
||||
public abstract void onOpen();
|
||||
|
||||
public void process()
|
||||
{
|
||||
// This method is called by the network thread and
|
||||
|
|
|
@ -18,9 +18,11 @@ import java.net.SocketAddress;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collection;
|
||||
import java.util.EventListener;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
|
@ -33,6 +35,7 @@ import org.eclipse.jetty.quic.quiche.QuicheConnectionId;
|
|||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -49,6 +52,7 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(QuicConnection.class);
|
||||
|
||||
private final List<QuicSession.Listener> listeners = new CopyOnWriteArrayList<>();
|
||||
private final ConcurrentMap<QuicheConnectionId, QuicSession> sessions = new ConcurrentHashMap<>();
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private final Scheduler scheduler;
|
||||
|
@ -110,6 +114,22 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
return List.copyOf(sessions.values());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addEventListener(EventListener listener)
|
||||
{
|
||||
super.addEventListener(listener);
|
||||
if (listener instanceof QuicSession.Listener)
|
||||
listeners.add((QuicSession.Listener)listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeEventListener(EventListener listener)
|
||||
{
|
||||
super.removeEventListener(listener);
|
||||
if (listener instanceof QuicSession.Listener)
|
||||
listeners.remove((QuicSession.Listener)listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract boolean onIdleExpired();
|
||||
|
||||
|
@ -133,7 +153,10 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
LOG.debug("outward close {} on {}", session, this);
|
||||
QuicheConnectionId connectionId = session.getConnectionId();
|
||||
if (connectionId != null)
|
||||
{
|
||||
sessions.remove(connectionId);
|
||||
LifeCycle.stop(session);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -189,6 +212,8 @@ public abstract class QuicConnection extends AbstractConnection
|
|||
session.setConnectionId(quicheConnectionId);
|
||||
session.setIdleTimeout(getEndPoint().getIdleTimeout());
|
||||
sessions.put(quicheConnectionId, session);
|
||||
listeners.forEach(session::addEventListener);
|
||||
LifeCycle.start(session);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -19,6 +19,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.ArrayDeque;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EventListener;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -37,7 +38,8 @@ import org.eclipse.jetty.quic.quiche.QuicheConnection;
|
|||
import org.eclipse.jetty.quic.quiche.QuicheConnectionId;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.eclipse.jetty.util.thread.ExecutionStrategy;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
|
@ -55,7 +57,7 @@ import org.slf4j.LoggerFactory;
|
|||
* <p>On the receive side, a QuicSession <em>fans-out</em> to multiple {@link QuicStreamEndPoint}s.</p>
|
||||
* <p>On the send side, many {@link QuicStreamEndPoint}s <em>fan-in</em> to a QuicSession.</p>
|
||||
*/
|
||||
public abstract class QuicSession
|
||||
public abstract class QuicSession extends ContainerLifeCycle
|
||||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class);
|
||||
|
||||
|
@ -83,12 +85,57 @@ public abstract class QuicSession
|
|||
this.quicheConnection = quicheConnection;
|
||||
this.connection = connection;
|
||||
this.flusher = new Flusher(scheduler);
|
||||
addBean(flusher);
|
||||
this.strategy = new AdaptiveExecutionStrategy(new Producer(), executor);
|
||||
addBean(strategy);
|
||||
this.remoteAddress = remoteAddress;
|
||||
LifeCycle.start(strategy);
|
||||
Arrays.setAll(ids, i -> new AtomicLong());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws Exception
|
||||
{
|
||||
super.doStart();
|
||||
getEventListeners().stream()
|
||||
.filter(Listener.class::isInstance)
|
||||
.map(Listener.class::cast)
|
||||
.forEach(this::notifyOpened);
|
||||
}
|
||||
|
||||
private void notifyOpened(Listener listener)
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onOpened(this);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("failure notifying listener {}", listener, x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
super.doStop();
|
||||
getEventListeners().stream()
|
||||
.filter(Listener.class::isInstance)
|
||||
.map(Listener.class::cast)
|
||||
.forEach(this::notifyClosed);
|
||||
}
|
||||
|
||||
private void notifyClosed(Listener listener)
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onClosed(this);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("failure notifying listener {}", listener, x);
|
||||
}
|
||||
}
|
||||
|
||||
public Executor getExecutor()
|
||||
{
|
||||
return executor;
|
||||
|
@ -149,11 +196,6 @@ public abstract class QuicSession
|
|||
return protocolSession.onIdleTimeout();
|
||||
}
|
||||
|
||||
public void onOpen()
|
||||
{
|
||||
protocolSession.onOpen();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param streamType the stream type
|
||||
* @return a new stream ID for the given type
|
||||
|
@ -284,7 +326,7 @@ public abstract class QuicSession
|
|||
if (protocolSession == null)
|
||||
{
|
||||
protocolSession = createProtocolSession();
|
||||
onOpen();
|
||||
addManaged(protocolSession);
|
||||
}
|
||||
protocolSession.process();
|
||||
}
|
||||
|
@ -372,7 +414,6 @@ public abstract class QuicSession
|
|||
try
|
||||
{
|
||||
endPoints.clear();
|
||||
LifeCycle.stop(strategy);
|
||||
flusher.close();
|
||||
getQuicConnection().outwardClose(this, failure);
|
||||
}
|
||||
|
@ -383,6 +424,12 @@ public abstract class QuicSession
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
dumpObjects(out, indent, new DumpableCollection("endPoints", getQuicStreamEndPoints()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
@ -494,4 +541,15 @@ public abstract class QuicSession
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public interface Listener extends EventListener
|
||||
{
|
||||
public default void onOpened(QuicSession session)
|
||||
{
|
||||
}
|
||||
|
||||
public default void onClosed(QuicSession session)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
|
||||
//
|
||||
// This program and the accompanying materials are made available under the
|
||||
// terms of the Eclipse Public License v. 2.0 which is available at
|
||||
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
|
||||
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
|
||||
//
|
||||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.quic.common;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||
|
||||
public class QuicSessionContainer extends AbstractLifeCycle implements QuicSession.Listener, Dumpable
|
||||
{
|
||||
private final Set<QuicSession> sessions = ConcurrentHashMap.newKeySet();
|
||||
|
||||
@Override
|
||||
public void onOpened(QuicSession session)
|
||||
{
|
||||
sessions.add(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClosed(QuicSession session)
|
||||
{
|
||||
sessions.remove(session);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String dump()
|
||||
{
|
||||
return Dumpable.dump(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
Dumpable.dumpObjects(out, indent, this, new DumpableCollection("sessions", sessions));
|
||||
}
|
||||
}
|
|
@ -29,6 +29,7 @@ import org.eclipse.jetty.io.DatagramChannelEndPoint;
|
|||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.ManagedSelector;
|
||||
import org.eclipse.jetty.io.SelectorManager;
|
||||
import org.eclipse.jetty.quic.common.QuicSessionContainer;
|
||||
import org.eclipse.jetty.quic.quiche.QuicheConfig;
|
||||
import org.eclipse.jetty.quic.quiche.SSLKeyPair;
|
||||
import org.eclipse.jetty.server.AbstractNetworkConnector;
|
||||
|
@ -40,6 +41,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
|||
|
||||
public class QuicServerConnector extends AbstractNetworkConnector
|
||||
{
|
||||
private final QuicSessionContainer container = new QuicSessionContainer();
|
||||
private final ServerDatagramSelectorManager selectorManager;
|
||||
private final SslContextFactory.Server sslContextFactory;
|
||||
private final QuicheConfig quicheConfig = new QuicheConfig();
|
||||
|
@ -62,6 +64,7 @@ public class QuicServerConnector extends AbstractNetworkConnector
|
|||
addBean(this.selectorManager);
|
||||
this.sslContextFactory = sslContextFactory;
|
||||
addBean(this.sslContextFactory);
|
||||
addBean(container);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -246,6 +249,7 @@ public class QuicServerConnector extends AbstractNetworkConnector
|
|||
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
|
||||
{
|
||||
ServerQuicConnection connection = new ServerQuicConnection(QuicServerConnector.this, endpoint, quicheConfig);
|
||||
connection.addEventListener(container);
|
||||
connection.setInputBufferSize(getInputBufferSize());
|
||||
connection.setOutputBufferSize(getOutputBufferSize());
|
||||
connection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
|
||||
|
|
|
@ -33,11 +33,6 @@ public class ServerProtocolSession extends ProtocolSession
|
|||
return (ServerQuicSession)super.getQuicSession();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onReadable(long readableStreamId)
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue