better align client and server quic connections
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
e8376a1a21
commit
a1b4eafc73
|
@ -354,7 +354,7 @@ public class ClientDatagramConnector extends ContainerLifeCycle implements IClie
|
|||
{
|
||||
Connect connect = (Connect)attachment;
|
||||
Map<String, Object> contextMap = connect.getContext();
|
||||
return new QuicConnection(executor, scheduler, byteBufferPool, endPoint, contextMap, quicheConfig);
|
||||
return new QuicConnection(executor, scheduler, byteBufferPool, endPoint, quicheConfig, contextMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -45,22 +45,23 @@ public class QuicConnection extends AbstractConnection
|
|||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(QuicConnection.class);
|
||||
|
||||
private final ConcurrentMap<QuicheConnectionId, QuicSession> sessions = new ConcurrentHashMap<>();
|
||||
private final Map<InetSocketAddress, QuicSession> pendingSessions = new ConcurrentHashMap<>();
|
||||
private final Map<String, Object> context;
|
||||
|
||||
private final ConcurrentMap<QuicheConnectionId, QuicSession> sessions = new ConcurrentHashMap<>();
|
||||
private final Scheduler scheduler;
|
||||
private final ByteBufferPool byteBufferPool;
|
||||
private final QuicheConfig quicheConfig;
|
||||
private final Flusher flusher = new Flusher();
|
||||
private final Map<String, Object> context;
|
||||
|
||||
public QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endPoint, Map<String, Object> context, QuicheConfig quicheConfig)
|
||||
public QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endp, QuicheConfig quicheConfig, Map<String, Object> context)
|
||||
{
|
||||
super(endPoint, executor);
|
||||
super(endp, executor);
|
||||
this.scheduler = scheduler;
|
||||
this.byteBufferPool = byteBufferPool;
|
||||
this.context = context;
|
||||
this.quicheConfig = quicheConfig;
|
||||
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
void onClose(QuicheConnectionId quicheConnectionId)
|
||||
|
|
|
@ -13,14 +13,12 @@
|
|||
|
||||
package org.eclipse.jetty.http3.server;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.http3.common.QuicDatagramEndPoint;
|
||||
import org.eclipse.jetty.http3.quiche.QuicheConfig;
|
||||
|
@ -29,11 +27,13 @@ import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
|
|||
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.Connector;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IteratingCallback;
|
||||
import org.eclipse.jetty.util.thread.AutoLock;
|
||||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -41,43 +41,22 @@ public class QuicConnection extends AbstractConnection
|
|||
{
|
||||
private static final Logger LOG = LoggerFactory.getLogger(QuicConnection.class);
|
||||
|
||||
private final ConcurrentMap<QuicheConnectionId, QuicSession> sessions = new ConcurrentHashMap<>();
|
||||
private final Connector connector;
|
||||
private final QuicheConfig quicheConfig;
|
||||
|
||||
private final ConcurrentMap<QuicheConnectionId, QuicSession> sessions = new ConcurrentHashMap<>();
|
||||
private final Scheduler scheduler;
|
||||
private final ByteBufferPool byteBufferPool;
|
||||
private final QuicheConfig quicheConfig;
|
||||
private final Flusher flusher = new Flusher();
|
||||
|
||||
public QuicConnection(Connector connector, QuicDatagramEndPoint endp)
|
||||
public QuicConnection(Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, EndPoint endp, QuicheConfig quicheConfig, Connector connector)
|
||||
{
|
||||
super(endp, connector.getExecutor());
|
||||
super(endp, executor);
|
||||
this.scheduler = scheduler;
|
||||
this.byteBufferPool = byteBufferPool;
|
||||
this.quicheConfig = quicheConfig;
|
||||
|
||||
this.connector = connector;
|
||||
this.byteBufferPool = connector.getByteBufferPool();
|
||||
|
||||
File[] files;
|
||||
try
|
||||
{
|
||||
SSLKeyPair keyPair;
|
||||
keyPair = new SSLKeyPair(new File("src/test/resources/keystore.p12"), "PKCS12", "storepwd".toCharArray(), "mykey", "storepwd".toCharArray());
|
||||
files = keyPair.export(new File(System.getProperty("java.io.tmpdir")));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
// TODO make the QuicheConfig configurable
|
||||
quicheConfig = new QuicheConfig();
|
||||
quicheConfig.setPrivKeyPemPath(files[0].getPath());
|
||||
quicheConfig.setCertChainPemPath(files[1].getPath());
|
||||
quicheConfig.setVerifyPeer(false);
|
||||
quicheConfig.setMaxIdleTimeout(5000L);
|
||||
quicheConfig.setInitialMaxData(10000000L);
|
||||
quicheConfig.setInitialMaxStreamDataBidiLocal(10000000L);
|
||||
quicheConfig.setInitialMaxStreamDataBidiRemote(10000000L);
|
||||
quicheConfig.setInitialMaxStreamDataUni(10000000L);
|
||||
quicheConfig.setInitialMaxStreamsBidi(100L);
|
||||
quicheConfig.setCongestionControl(QuicheConfig.CongestionControl.RENO);
|
||||
quicheConfig.setApplicationProtos(getProtocols().toArray(new String[0]));
|
||||
}
|
||||
|
||||
void onClose(QuicheConnectionId quicheConnectionId)
|
||||
|
@ -92,13 +71,6 @@ public class QuicConnection extends AbstractConnection
|
|||
super.close();
|
||||
}
|
||||
|
||||
private Collection<String> getProtocols()
|
||||
{
|
||||
List<String> protocols = connector.getProtocols();
|
||||
protocols.add(0, "http/0.9"); // TODO this is only needed for Quiche example clients
|
||||
return protocols;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen()
|
||||
{
|
||||
|
@ -172,7 +144,7 @@ public class QuicConnection extends AbstractConnection
|
|||
}
|
||||
else
|
||||
{
|
||||
session = new QuicSession(connector, quicheConnectionId, quicheConnection, this, remoteAddress);
|
||||
session = new QuicSession(connector, getExecutor(), scheduler, byteBufferPool, quicheConnectionId, quicheConnection, this, remoteAddress);
|
||||
sessions.putIfAbsent(quicheConnectionId, session);
|
||||
session.flush(); // send the response packet(s) that accept generated.
|
||||
if (LOG.isDebugEnabled())
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
|||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.eclipse.jetty.http3.quiche.QuicheConnection;
|
||||
|
@ -49,6 +50,7 @@ public class QuicSession
|
|||
|
||||
private final Flusher flusher;
|
||||
private final Connector connector;
|
||||
private final ByteBufferPool byteBufferPool;
|
||||
private final QuicheConnectionId quicheConnectionId;
|
||||
private final QuicheConnection quicheConnection;
|
||||
private final QuicConnection connection;
|
||||
|
@ -58,21 +60,22 @@ public class QuicSession
|
|||
private final Queue<Runnable> strategyQueue = new ArrayDeque<>();
|
||||
private InetSocketAddress remoteAddress;
|
||||
|
||||
QuicSession(Connector connector, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress)
|
||||
QuicSession(Connector connector, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress)
|
||||
{
|
||||
this.connector = connector;
|
||||
this.byteBufferPool = byteBufferPool;
|
||||
this.quicheConnectionId = quicheConnectionId;
|
||||
this.quicheConnection = quicheConnection;
|
||||
this.connection = connection;
|
||||
this.remoteAddress = remoteAddress;
|
||||
this.flusher = new Flusher(connector.getScheduler());
|
||||
this.flusher = new Flusher(scheduler);
|
||||
this.strategy = new EatWhatYouKill(() ->
|
||||
{
|
||||
try (AutoLock l = strategyQueueLock.lock())
|
||||
{
|
||||
return strategyQueue.poll();
|
||||
}
|
||||
}, connector.getExecutor());
|
||||
}, executor);
|
||||
LifeCycle.start(strategy);
|
||||
}
|
||||
|
||||
|
@ -280,7 +283,6 @@ public class QuicSession
|
|||
@Override
|
||||
protected Action process() throws IOException
|
||||
{
|
||||
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
|
||||
cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN, true);
|
||||
int pos = BufferUtil.flipToFill(cipherBuffer);
|
||||
int drained = quicheConnection.drainCipherText(cipherBuffer);
|
||||
|
@ -309,7 +311,6 @@ public class QuicSession
|
|||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
|
||||
byteBufferPool.release(cipherBuffer);
|
||||
super.succeeded();
|
||||
}
|
||||
|
@ -317,7 +318,6 @@ public class QuicSession
|
|||
@Override
|
||||
protected void onCompleteFailure(Throwable cause)
|
||||
{
|
||||
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
|
||||
byteBufferPool.release(cipherBuffer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
package org.eclipse.jetty.http3.server;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
|
@ -21,12 +22,12 @@ import java.nio.channels.SelectableChannel;
|
|||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.util.EventListener;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.eclipse.jetty.http3.common.QuicDatagramEndPoint;
|
||||
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
|
||||
import org.eclipse.jetty.http3.quiche.QuicheConfig;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
|
@ -42,6 +43,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
|
|||
public class ServerDatagramConnector extends AbstractNetworkConnector
|
||||
{
|
||||
private final ServerDatagramSelectorManager _manager;
|
||||
private final QuicheConfig _quicheConfig;
|
||||
private volatile DatagramChannel _datagramChannel;
|
||||
private volatile int _localPort = -1;
|
||||
|
||||
|
@ -58,11 +60,33 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
|
|||
addBean(_manager, true);
|
||||
setAcceptorPriorityDelta(-2);
|
||||
|
||||
// Force loading libquiche here.
|
||||
long before = System.nanoTime();
|
||||
LibQuiche.Logging.enable();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Loading libquiche took {} ms", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - before));
|
||||
File[] files;
|
||||
try
|
||||
{
|
||||
SSLKeyPair keyPair;
|
||||
keyPair = new SSLKeyPair(new File("src/test/resources/keystore.p12"), "PKCS12", "storepwd".toCharArray(), "mykey", "storepwd".toCharArray());
|
||||
files = keyPair.export(new File(System.getProperty("java.io.tmpdir")));
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
// TODO make the QuicheConfig configurable
|
||||
_quicheConfig = new QuicheConfig();
|
||||
_quicheConfig.setPrivKeyPemPath(files[0].getPath());
|
||||
_quicheConfig.setCertChainPemPath(files[1].getPath());
|
||||
_quicheConfig.setVerifyPeer(false);
|
||||
_quicheConfig.setMaxIdleTimeout(5000L);
|
||||
_quicheConfig.setInitialMaxData(10000000L);
|
||||
_quicheConfig.setInitialMaxStreamDataBidiLocal(10000000L);
|
||||
_quicheConfig.setInitialMaxStreamDataBidiRemote(10000000L);
|
||||
_quicheConfig.setInitialMaxStreamDataUni(10000000L);
|
||||
_quicheConfig.setInitialMaxStreamsBidi(100L);
|
||||
_quicheConfig.setCongestionControl(QuicheConfig.CongestionControl.RENO);
|
||||
List<String> protocols = getProtocols();
|
||||
protocols.add(0, "http/0.9"); // TODO this is only needed for Quiche example clients
|
||||
_quicheConfig.setApplicationProtos(protocols.toArray(new String[0]));
|
||||
}
|
||||
|
||||
public ServerDatagramConnector(
|
||||
|
@ -187,7 +211,7 @@ public class ServerDatagramConnector extends AbstractNetworkConnector
|
|||
@Override
|
||||
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
|
||||
{
|
||||
return new QuicConnection(ServerDatagramConnector.this, (QuicDatagramEndPoint)endpoint);
|
||||
return new QuicConnection(getExecutor(), getScheduler(), getByteBufferPool(), (QuicDatagramEndPoint)endpoint, _quicheConfig, ServerDatagramConnector.this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue