Issue #6728 - QUIC and HTTP/3

- Added more configuration properties.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-05 11:01:45 +02:00
parent a44984e6e4
commit f061cc41db
6 changed files with 132 additions and 35 deletions

View File

@ -18,12 +18,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.UnaryOperator;
import org.eclipse.jetty.http3.api.Session;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.quic.client.ClientQuicConnection;
import org.eclipse.jetty.quic.client.QuicClientConnectorConfigurator;
import org.eclipse.jetty.quic.common.QuicConnection;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -47,6 +50,10 @@ public class HTTP3Client extends ContainerLifeCycle
private final ClientConnector connector;
private List<String> protocols = List.of("h3");
private long streamIdleTimeout = 30000;
private int inputBufferSize = 2048;
private int outputBufferSize = 2048;
private boolean useInputDirectByteBuffers = true;
private boolean useOutputDirectByteBuffers = true;
public HTTP3Client()
{
@ -54,6 +61,46 @@ public class HTTP3Client extends ContainerLifeCycle
addBean(connector);
}
public int getInputBufferSize()
{
return inputBufferSize;
}
public void setInputBufferSize(int inputBufferSize)
{
this.inputBufferSize = inputBufferSize;
}
public int getOutputBufferSize()
{
return outputBufferSize;
}
public void setOutputBufferSize(int outputBufferSize)
{
this.outputBufferSize = outputBufferSize;
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
@ManagedAttribute("The ALPN protocol list")
public List<String> getProtocols()
{
@ -87,6 +134,7 @@ public class HTTP3Client extends ContainerLifeCycle
context.put(ClientQuicConnection.APPLICATION_PROTOCOLS, getProtocols());
context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, factory);
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, Promise.from(ioConnection -> {}, completable::failed));
context.put(QuicClientConnectorConfigurator.CONNECTION_CONFIGURATOR_CONTEXT_KEY, (UnaryOperator<Connection>)this::configureConnection);
if (LOG.isDebugEnabled())
LOG.debug("connecting to {}", address);
@ -94,4 +142,17 @@ public class HTTP3Client extends ContainerLifeCycle
connector.connect(address, context);
return completable;
}
private Connection configureConnection(Connection connection)
{
if (connection instanceof QuicConnection)
{
QuicConnection quicConnection = (QuicConnection)connection;
quicConnection.setInputBufferSize(getInputBufferSize());
quicConnection.setOutputBufferSize(getOutputBufferSize());
quicConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
quicConnection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());
}
return connection;
}
}

View File

@ -44,14 +44,14 @@ public class ClientHTTP3Session extends ClientProtocolSession
private final QpackEncoder encoder;
private final QpackDecoder decoder;
private final HTTP3SessionClient applicationSession;
private final HTTP3SessionClient session;
private final ControlFlusher controlFlusher;
private final HTTP3Flusher messageFlusher;
public ClientHTTP3Session(ClientQuicSession session, Session.Client.Listener listener, Promise<Session.Client> promise, int maxBlockedStreams, int maxResponseHeadersSize)
{
super(session);
this.applicationSession = new HTTP3SessionClient(this, listener, promise);
this.session = new HTTP3SessionClient(this, listener, promise);
if (LOG.isDebugEnabled())
LOG.debug("initializing HTTP/3 streams");
@ -87,29 +87,29 @@ public class ClientHTTP3Session extends ClientProtocolSession
public HTTP3SessionClient getSessionClient()
{
return applicationSession;
return session;
}
public long getStreamIdleTimeout()
{
return applicationSession.getStreamIdleTimeout();
return session.getStreamIdleTimeout();
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
applicationSession.setStreamIdleTimeout(streamIdleTimeout);
session.setStreamIdleTimeout(streamIdleTimeout);
}
@Override
public void onOpen()
{
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = applicationSession.onPreface();
Map<Long, Long> settings = session.onPreface();
if (settings == null)
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, applicationSession::onOpen, this::fail));
controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::fail));
controlFlusher.iterate();
}
@ -154,12 +154,12 @@ public class ClientHTTP3Session extends ClientProtocolSession
{
if (LOG.isDebugEnabled())
LOG.debug("session closed remotely {} {}", closeInfo, this);
applicationSession.notifySessionFailure(closeInfo.error(), closeInfo.reason());
session.notifySessionFailure(closeInfo.error(), closeInfo.reason());
}
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
{
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession);
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, session);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
@ -174,6 +174,6 @@ public class ClientHTTP3Session extends ClientProtocolSession
public void onDataAvailable(long streamId)
{
applicationSession.onDataAvailable(streamId);
session.onDataAvailable(streamId);
}
}

View File

@ -43,14 +43,14 @@ public class ServerHTTP3Session extends ServerProtocolSession
private final QpackEncoder encoder;
private final QpackDecoder decoder;
private final HTTP3SessionServer applicationSession;
private final HTTP3SessionServer session;
private final ControlFlusher controlFlusher;
private final HTTP3Flusher messageFlusher;
public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize)
{
super(session);
this.applicationSession = new HTTP3SessionServer(this, listener);
this.session = new HTTP3SessionServer(this, listener);
if (LOG.isDebugEnabled())
LOG.debug("initializing HTTP/3 streams");
@ -86,29 +86,29 @@ public class ServerHTTP3Session extends ServerProtocolSession
public HTTP3SessionServer getSessionServer()
{
return applicationSession;
return session;
}
public long getStreamIdleTimeout()
{
return applicationSession.getStreamIdleTimeout();
return session.getStreamIdleTimeout();
}
public void setStreamIdleTimeout(long streamIdleTimeout)
{
applicationSession.setStreamIdleTimeout(streamIdleTimeout);
session.setStreamIdleTimeout(streamIdleTimeout);
}
@Override
public void onOpen()
{
// Queue the mandatory SETTINGS frame.
Map<Long, Long> settings = applicationSession.onPreface();
Map<Long, Long> settings = session.onPreface();
if (settings == null)
settings = Map.of();
// TODO: add default settings.
SettingsFrame frame = new SettingsFrame(settings);
controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, applicationSession::onOpen, this::fail));
controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::fail));
controlFlusher.iterate();
}
@ -158,7 +158,7 @@ public class ServerHTTP3Session extends ServerProtocolSession
private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint)
{
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession);
UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, session);
endPoint.setConnection(connection);
endPoint.onOpen();
connection.onOpen();
@ -173,15 +173,15 @@ public class ServerHTTP3Session extends ServerProtocolSession
protected void onDataAvailable(long streamId)
{
applicationSession.onDataAvailable(streamId);
session.onDataAvailable(streamId);
}
private void notifySessionFailure(CloseInfo closeInfo)
{
Session.Listener listener = applicationSession.getListener();
Session.Listener listener = session.getListener();
try
{
listener.onSessionFailure(applicationSession, closeInfo.error(), closeInfo.reason());
listener.onSessionFailure(session, closeInfo.error(), closeInfo.reason());
}
catch (Throwable x)
{

View File

@ -19,6 +19,7 @@ import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.Map;
import java.util.function.UnaryOperator;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
@ -28,6 +29,8 @@ import org.eclipse.jetty.io.ManagedSelector;
public class QuicClientConnectorConfigurator extends ClientConnector.Configurator
{
public static final String CONNECTION_CONFIGURATOR_CONTEXT_KEY = QuicClientConnectorConfigurator.class.getSimpleName() + ".connectionConfigurator";
@Override
public boolean isIntrinsicallySecure(ClientConnector clientConnector, SocketAddress address)
{
@ -50,6 +53,10 @@ public class QuicClientConnectorConfigurator extends ClientConnector.Configurato
@Override
public Connection newConnection(ClientConnector clientConnector, SocketAddress address, EndPoint endPoint, Map<String, Object> context)
{
return new ClientQuicConnection(clientConnector.getExecutor(), clientConnector.getScheduler(), clientConnector.getByteBufferPool(), endPoint, context);
@SuppressWarnings("unchecked")
UnaryOperator<Connection> configurator = (UnaryOperator<Connection>)context.get(CONNECTION_CONFIGURATOR_CONTEXT_KEY);
if (configurator == null)
configurator = UnaryOperator.identity();
return configurator.apply(new ClientQuicConnection(clientConnector.getExecutor(), clientConnector.getScheduler(), clientConnector.getByteBufferPool(), endPoint, context));
}
}

View File

@ -27,7 +27,6 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.DatagramChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.quiche.QuicheConnectionId;
import org.eclipse.jetty.quic.quiche.ffi.LibQuiche;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
@ -37,11 +36,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A {@link Connection} implementation that receives and sends datagram packets via its associated datagram {@link EndPoint}.</p>
* <p>A {@link Connection} implementation that receives and sends datagram packets via its associated {@link DatagramChannelEndPoint}.</p>
* <p>The received bytes are peeked to obtain the QUIC connection ID; each QUIC connection ID has an associated
* {@link QuicSession}, and the received bytes are then passed to the {@link QuicSession} for processing.</p>
* <p>On the receive side, a QuicConnection <em>fans-out</em> to multiple {@link QuicSession}s.</p>
* <p>On the send side, many {@link QuicSession}s <em>fan-in</em> to a QuicConnection.</p>
* <p>On the receive side, one QuicConnection <em>fans-out</em> to multiple {@link QuicSession}s.</p>
* <p>On the send side, many {@link QuicSession}s <em>fan-in</em> to one QuicConnection.</p>
*/
public abstract class QuicConnection extends AbstractConnection
{
@ -51,6 +50,9 @@ public abstract class QuicConnection extends AbstractConnection
private final Scheduler scheduler;
private final ByteBufferPool byteBufferPool;
private final Flusher flusher = new Flusher();
private int outputBufferSize;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;
protected QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endPoint)
{
@ -69,10 +71,40 @@ public abstract class QuicConnection extends AbstractConnection
return byteBufferPool;
}
public int getOutputBufferSize()
{
return outputBufferSize;
}
public void setOutputBufferSize(int outputBufferSize)
{
this.outputBufferSize = outputBufferSize;
}
public boolean isUseInputDirectByteBuffers()
{
return useInputDirectByteBuffers;
}
public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers)
{
this.useInputDirectByteBuffers = useInputDirectByteBuffers;
}
public boolean isUseOutputDirectByteBuffers()
{
return useOutputDirectByteBuffers;
}
public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers)
{
this.useOutputDirectByteBuffers = useOutputDirectByteBuffers;
}
protected void closeSession(QuicheConnectionId quicheConnectionId, QuicSession session, Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("closing session of type {} cid={}", getClass().getSimpleName(), quicheConnectionId);
LOG.debug("closing session cid={} {}", quicheConnectionId, this);
if (quicheConnectionId != null)
sessions.remove(quicheConnectionId);
}
@ -81,12 +113,12 @@ public abstract class QuicConnection extends AbstractConnection
public void close()
{
if (LOG.isDebugEnabled())
LOG.debug("closing connection of type {}", getClass().getSimpleName());
LOG.debug("closing connection {}", this);
sessions.values().forEach(QuicSession::close);
sessions.clear();
super.close();
if (LOG.isDebugEnabled())
LOG.debug("closed connection of type {}", getClass().getSimpleName());
LOG.debug("closed connection {}", this);
}
@Override
@ -94,8 +126,7 @@ public abstract class QuicConnection extends AbstractConnection
{
try
{
// TODO make the buffer size configurable
ByteBuffer cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true)
{
BufferUtil.clear(cipherBuffer);
@ -182,7 +213,7 @@ public abstract class QuicConnection extends AbstractConnection
{
queue.offer(new Entry(callback, address, buffers));
}
flusher.iterate();
iterate();
}
@Override

View File

@ -34,7 +34,6 @@ import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.quic.quiche.QuicheConnection;
import org.eclipse.jetty.quic.quiche.QuicheConnectionId;
import org.eclipse.jetty.quic.quiche.ffi.LibQuiche;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.component.LifeCycle;
@ -392,8 +391,7 @@ public abstract class QuicSession
@Override
protected Action process() throws IOException
{
// TODO make the buffer size configurable
cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
cipherBuffer = byteBufferPool.acquire(connection.getOutputBufferSize(), connection.isUseOutputDirectByteBuffers());
int pos = BufferUtil.flipToFill(cipherBuffer);
int drained = quicheConnection.drainCipherBytes(cipherBuffer);
if (LOG.isDebugEnabled())