diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3Client.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3Client.java index 781869e85dc..fab8794ea27 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3Client.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/HTTP3Client.java @@ -18,12 +18,15 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.UnaryOperator; import org.eclipse.jetty.http3.api.Session; import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.quic.client.ClientQuicConnection; import org.eclipse.jetty.quic.client.QuicClientConnectorConfigurator; +import org.eclipse.jetty.quic.common.QuicConnection; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.component.ContainerLifeCycle; @@ -47,6 +50,10 @@ public class HTTP3Client extends ContainerLifeCycle private final ClientConnector connector; private List protocols = List.of("h3"); private long streamIdleTimeout = 30000; + private int inputBufferSize = 2048; + private int outputBufferSize = 2048; + private boolean useInputDirectByteBuffers = true; + private boolean useOutputDirectByteBuffers = true; public HTTP3Client() { @@ -54,6 +61,46 @@ public class HTTP3Client extends ContainerLifeCycle addBean(connector); } + public int getInputBufferSize() + { + return inputBufferSize; + } + + public void setInputBufferSize(int inputBufferSize) + { + this.inputBufferSize = inputBufferSize; + } + + public int getOutputBufferSize() + { + return outputBufferSize; + } + + public void setOutputBufferSize(int outputBufferSize) + { + this.outputBufferSize = outputBufferSize; + } + + public boolean isUseInputDirectByteBuffers() + { + return useInputDirectByteBuffers; + } + + public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers) + { + this.useInputDirectByteBuffers = useInputDirectByteBuffers; + } + + public boolean isUseOutputDirectByteBuffers() + { + return useOutputDirectByteBuffers; + } + + public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers) + { + this.useOutputDirectByteBuffers = useOutputDirectByteBuffers; + } + @ManagedAttribute("The ALPN protocol list") public List getProtocols() { @@ -87,6 +134,7 @@ public class HTTP3Client extends ContainerLifeCycle context.put(ClientQuicConnection.APPLICATION_PROTOCOLS, getProtocols()); context.put(ClientConnector.CLIENT_CONNECTION_FACTORY_CONTEXT_KEY, factory); context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, Promise.from(ioConnection -> {}, completable::failed)); + context.put(QuicClientConnectorConfigurator.CONNECTION_CONFIGURATOR_CONTEXT_KEY, (UnaryOperator)this::configureConnection); if (LOG.isDebugEnabled()) LOG.debug("connecting to {}", address); @@ -94,4 +142,17 @@ public class HTTP3Client extends ContainerLifeCycle connector.connect(address, context); return completable; } + + private Connection configureConnection(Connection connection) + { + if (connection instanceof QuicConnection) + { + QuicConnection quicConnection = (QuicConnection)connection; + quicConnection.setInputBufferSize(getInputBufferSize()); + quicConnection.setOutputBufferSize(getOutputBufferSize()); + quicConnection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers()); + quicConnection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers()); + } + return connection; + } } diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java index 7a563c1eabd..5056bd66b9c 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java @@ -44,14 +44,14 @@ public class ClientHTTP3Session extends ClientProtocolSession private final QpackEncoder encoder; private final QpackDecoder decoder; - private final HTTP3SessionClient applicationSession; + private final HTTP3SessionClient session; private final ControlFlusher controlFlusher; private final HTTP3Flusher messageFlusher; public ClientHTTP3Session(ClientQuicSession session, Session.Client.Listener listener, Promise promise, int maxBlockedStreams, int maxResponseHeadersSize) { super(session); - this.applicationSession = new HTTP3SessionClient(this, listener, promise); + this.session = new HTTP3SessionClient(this, listener, promise); if (LOG.isDebugEnabled()) LOG.debug("initializing HTTP/3 streams"); @@ -87,29 +87,29 @@ public class ClientHTTP3Session extends ClientProtocolSession public HTTP3SessionClient getSessionClient() { - return applicationSession; + return session; } public long getStreamIdleTimeout() { - return applicationSession.getStreamIdleTimeout(); + return session.getStreamIdleTimeout(); } public void setStreamIdleTimeout(long streamIdleTimeout) { - applicationSession.setStreamIdleTimeout(streamIdleTimeout); + session.setStreamIdleTimeout(streamIdleTimeout); } @Override public void onOpen() { // Queue the mandatory SETTINGS frame. - Map settings = applicationSession.onPreface(); + Map settings = session.onPreface(); if (settings == null) settings = Map.of(); // TODO: add default settings. SettingsFrame frame = new SettingsFrame(settings); - controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, applicationSession::onOpen, this::fail)); + controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::fail)); controlFlusher.iterate(); } @@ -154,12 +154,12 @@ public class ClientHTTP3Session extends ClientProtocolSession { if (LOG.isDebugEnabled()) LOG.debug("session closed remotely {} {}", closeInfo, this); - applicationSession.notifySessionFailure(closeInfo.error(), closeInfo.reason()); + session.notifySessionFailure(closeInfo.error(), closeInfo.reason()); } private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint) { - UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession); + UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, session); endPoint.setConnection(connection); endPoint.onOpen(); connection.onOpen(); @@ -174,6 +174,6 @@ public class ClientHTTP3Session extends ClientProtocolSession public void onDataAvailable(long streamId) { - applicationSession.onDataAvailable(streamId); + session.onDataAvailable(streamId); } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index 9931da7471a..49c672edab0 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -43,14 +43,14 @@ public class ServerHTTP3Session extends ServerProtocolSession private final QpackEncoder encoder; private final QpackDecoder decoder; - private final HTTP3SessionServer applicationSession; + private final HTTP3SessionServer session; private final ControlFlusher controlFlusher; private final HTTP3Flusher messageFlusher; public ServerHTTP3Session(ServerQuicSession session, Session.Server.Listener listener, int maxBlockedStreams, int maxRequestHeadersSize) { super(session); - this.applicationSession = new HTTP3SessionServer(this, listener); + this.session = new HTTP3SessionServer(this, listener); if (LOG.isDebugEnabled()) LOG.debug("initializing HTTP/3 streams"); @@ -86,29 +86,29 @@ public class ServerHTTP3Session extends ServerProtocolSession public HTTP3SessionServer getSessionServer() { - return applicationSession; + return session; } public long getStreamIdleTimeout() { - return applicationSession.getStreamIdleTimeout(); + return session.getStreamIdleTimeout(); } public void setStreamIdleTimeout(long streamIdleTimeout) { - applicationSession.setStreamIdleTimeout(streamIdleTimeout); + session.setStreamIdleTimeout(streamIdleTimeout); } @Override public void onOpen() { // Queue the mandatory SETTINGS frame. - Map settings = applicationSession.onPreface(); + Map settings = session.onPreface(); if (settings == null) settings = Map.of(); // TODO: add default settings. SettingsFrame frame = new SettingsFrame(settings); - controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, applicationSession::onOpen, this::fail)); + controlFlusher.offer(frame, Callback.from(Invocable.InvocationType.NON_BLOCKING, session::onOpen, this::fail)); controlFlusher.iterate(); } @@ -158,7 +158,7 @@ public class ServerHTTP3Session extends ServerProtocolSession private void configureUnidirectionalStreamEndPoint(QuicStreamEndPoint endPoint) { - UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, applicationSession); + UnidirectionalStreamConnection connection = new UnidirectionalStreamConnection(endPoint, getQuicSession().getExecutor(), getQuicSession().getByteBufferPool(), encoder, decoder, session); endPoint.setConnection(connection); endPoint.onOpen(); connection.onOpen(); @@ -173,15 +173,15 @@ public class ServerHTTP3Session extends ServerProtocolSession protected void onDataAvailable(long streamId) { - applicationSession.onDataAvailable(streamId); + session.onDataAvailable(streamId); } private void notifySessionFailure(CloseInfo closeInfo) { - Session.Listener listener = applicationSession.getListener(); + Session.Listener listener = session.getListener(); try { - listener.onSessionFailure(applicationSession, closeInfo.error(), closeInfo.reason()); + listener.onSessionFailure(session, closeInfo.error(), closeInfo.reason()); } catch (Throwable x) { diff --git a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/QuicClientConnectorConfigurator.java b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/QuicClientConnectorConfigurator.java index f6e561e5164..00d26e74ea3 100644 --- a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/QuicClientConnectorConfigurator.java +++ b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/QuicClientConnectorConfigurator.java @@ -19,6 +19,7 @@ import java.nio.channels.DatagramChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.util.Map; +import java.util.function.UnaryOperator; import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.io.Connection; @@ -28,6 +29,8 @@ import org.eclipse.jetty.io.ManagedSelector; public class QuicClientConnectorConfigurator extends ClientConnector.Configurator { + public static final String CONNECTION_CONFIGURATOR_CONTEXT_KEY = QuicClientConnectorConfigurator.class.getSimpleName() + ".connectionConfigurator"; + @Override public boolean isIntrinsicallySecure(ClientConnector clientConnector, SocketAddress address) { @@ -50,6 +53,10 @@ public class QuicClientConnectorConfigurator extends ClientConnector.Configurato @Override public Connection newConnection(ClientConnector clientConnector, SocketAddress address, EndPoint endPoint, Map context) { - return new ClientQuicConnection(clientConnector.getExecutor(), clientConnector.getScheduler(), clientConnector.getByteBufferPool(), endPoint, context); + @SuppressWarnings("unchecked") + UnaryOperator configurator = (UnaryOperator)context.get(CONNECTION_CONFIGURATOR_CONTEXT_KEY); + if (configurator == null) + configurator = UnaryOperator.identity(); + return configurator.apply(new ClientQuicConnection(clientConnector.getExecutor(), clientConnector.getScheduler(), clientConnector.getByteBufferPool(), endPoint, context)); } } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java index e2f26747c6b..f03eefdd5a6 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java @@ -27,7 +27,6 @@ import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.DatagramChannelEndPoint; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.quic.quiche.QuicheConnectionId; -import org.eclipse.jetty.quic.quiche.ffi.LibQuiche; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IteratingCallback; @@ -37,11 +36,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - *

A {@link Connection} implementation that receives and sends datagram packets via its associated datagram {@link EndPoint}.

+ *

A {@link Connection} implementation that receives and sends datagram packets via its associated {@link DatagramChannelEndPoint}.

*

The received bytes are peeked to obtain the QUIC connection ID; each QUIC connection ID has an associated * {@link QuicSession}, and the received bytes are then passed to the {@link QuicSession} for processing.

- *

On the receive side, a QuicConnection fans-out to multiple {@link QuicSession}s.

- *

On the send side, many {@link QuicSession}s fan-in to a QuicConnection.

+ *

On the receive side, one QuicConnection fans-out to multiple {@link QuicSession}s.

+ *

On the send side, many {@link QuicSession}s fan-in to one QuicConnection.

*/ public abstract class QuicConnection extends AbstractConnection { @@ -51,6 +50,9 @@ public abstract class QuicConnection extends AbstractConnection private final Scheduler scheduler; private final ByteBufferPool byteBufferPool; private final Flusher flusher = new Flusher(); + private int outputBufferSize; + private boolean useInputDirectByteBuffers; + private boolean useOutputDirectByteBuffers; protected QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endPoint) { @@ -69,10 +71,40 @@ public abstract class QuicConnection extends AbstractConnection return byteBufferPool; } + public int getOutputBufferSize() + { + return outputBufferSize; + } + + public void setOutputBufferSize(int outputBufferSize) + { + this.outputBufferSize = outputBufferSize; + } + + public boolean isUseInputDirectByteBuffers() + { + return useInputDirectByteBuffers; + } + + public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers) + { + this.useInputDirectByteBuffers = useInputDirectByteBuffers; + } + + public boolean isUseOutputDirectByteBuffers() + { + return useOutputDirectByteBuffers; + } + + public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers) + { + this.useOutputDirectByteBuffers = useOutputDirectByteBuffers; + } + protected void closeSession(QuicheConnectionId quicheConnectionId, QuicSession session, Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("closing session of type {} cid={}", getClass().getSimpleName(), quicheConnectionId); + LOG.debug("closing session cid={} {}", quicheConnectionId, this); if (quicheConnectionId != null) sessions.remove(quicheConnectionId); } @@ -81,12 +113,12 @@ public abstract class QuicConnection extends AbstractConnection public void close() { if (LOG.isDebugEnabled()) - LOG.debug("closing connection of type {}", getClass().getSimpleName()); + LOG.debug("closing connection {}", this); sessions.values().forEach(QuicSession::close); sessions.clear(); super.close(); if (LOG.isDebugEnabled()) - LOG.debug("closed connection of type {}", getClass().getSimpleName()); + LOG.debug("closed connection {}", this); } @Override @@ -94,8 +126,7 @@ public abstract class QuicConnection extends AbstractConnection { try { - // TODO make the buffer size configurable - ByteBuffer cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true); + ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); while (true) { BufferUtil.clear(cipherBuffer); @@ -182,7 +213,7 @@ public abstract class QuicConnection extends AbstractConnection { queue.offer(new Entry(callback, address, buffers)); } - flusher.iterate(); + iterate(); } @Override diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java index 92eabcabe9c..d8ae09b1ebd 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java @@ -34,7 +34,6 @@ import org.eclipse.jetty.io.CyclicTimeout; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.quic.quiche.QuicheConnection; import org.eclipse.jetty.quic.quiche.QuicheConnectionId; -import org.eclipse.jetty.quic.quiche.ffi.LibQuiche; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.component.LifeCycle; @@ -392,8 +391,7 @@ public abstract class QuicSession @Override protected Action process() throws IOException { - // TODO make the buffer size configurable - cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true); + cipherBuffer = byteBufferPool.acquire(connection.getOutputBufferSize(), connection.isUseOutputDirectByteBuffers()); int pos = BufferUtil.flipToFill(cipherBuffer); int drained = quicheConnection.drainCipherBytes(cipherBuffer); if (LOG.isDebugEnabled())