handle quiche connections timeouts

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-17 09:40:08 +01:00 committed by Simone Bordet
parent f0725d09d0
commit 25ee5e35ac
2 changed files with 71 additions and 5 deletions

View File

@ -80,6 +80,11 @@ public class QuicConnection extends AbstractConnection
quicheConfig.setApplicationProtos(getProtocols().toArray(new String[0]));
}
void onClose(QuicheConnectionId quicheConnectionId)
{
sessions.remove(quicheConnectionId);
}
private Collection<String> getProtocols()
{
// TODO get the protocols from the connector
@ -141,7 +146,7 @@ public class QuicConnection extends AbstractConnection
}
else
{
session = new QuicSession(connector, quicheConnection, this, remoteAddress);
session = new QuicSession(connector, quicheConnectionId, quicheConnection, this, remoteAddress);
sessions.putIfAbsent(quicheConnectionId, session);
session.flush();
if (LOG.isDebugEnabled())

View File

@ -19,11 +19,15 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
import org.eclipse.jetty.http3.quiche.ffi.LibQuiche;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Connector;
@ -37,19 +41,22 @@ public class QuicSession
{
private static final Logger LOG = LoggerFactory.getLogger(QuicSession.class);
private final Flusher flusher = new Flusher();
private final Flusher flusher;
private final Connector connector;
private final QuicheConnectionId quicheConnectionId;
private final QuicheConnection quicheConnection;
private final QuicConnection connection;
private final ConcurrentMap<Long, QuicStreamEndPoint> endpoints = new ConcurrentHashMap<>();
private InetSocketAddress remoteAddress;
QuicSession(Connector connector, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress)
QuicSession(Connector connector, QuicheConnectionId quicheConnectionId, QuicheConnection quicheConnection, QuicConnection connection, InetSocketAddress remoteAddress)
{
this.connector = connector;
this.quicheConnectionId = quicheConnectionId;
this.quicheConnection = quicheConnection;
this.connection = connection;
this.remoteAddress = remoteAddress;
this.flusher = new Flusher(connector.getScheduler());
}
public int fill(long streamId, ByteBuffer buffer) throws IOException
@ -182,21 +189,76 @@ public class QuicSession
return endPoint;
}
private void close()
{
if (LOG.isDebugEnabled())
LOG.debug("Closing QUIC session {}", this);
endpoints.values().forEach(AbstractEndPoint::close);
endpoints.clear();
flusher.close();
quicheConnection.dispose();
connection.onClose(quicheConnectionId);
}
@Override
public String toString()
{
return getClass().getSimpleName() + " id=" + quicheConnectionId;
}
private class Flusher extends IteratingCallback
{
private final CyclicTimeout timeout;
private ByteBuffer addressBuffer;
private ByteBuffer cipherBuffer;
public Flusher(Scheduler scheduler)
{
timeout = new CyclicTimeout(scheduler) {
@Override
public void onTimeoutExpired()
{
if (LOG.isDebugEnabled())
LOG.debug("quiche timeout callback");
quicheConnection.onTimeout();
if (quicheConnection.isConnectionClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("quiche connection closed after timeout, re-flushing");
iterate();
}
}
};
}
@Override
protected Action process() throws Throwable
public void close()
{
super.close();
timeout.destroy();
}
@Override
protected Action process() throws IOException
{
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
addressBuffer = QuicConnection.encodeInetSocketAddress(byteBufferPool, remoteAddress);
cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN + ServerDatagramEndPoint.ENCODED_ADDRESS_LENGTH, true);
int pos = BufferUtil.flipToFill(cipherBuffer);
int drained = quicheConnection.drainCipherText(cipherBuffer);
long nextTimeoutInMs = quicheConnection.nextTimeout();
if (LOG.isDebugEnabled())
LOG.debug("next quiche timeout: {} ms", nextTimeoutInMs);
if (nextTimeoutInMs > -1)
timeout.schedule(nextTimeoutInMs, TimeUnit.MILLISECONDS); // TODO is this re-scheduling cancelling the previous timeout?
else
timeout.cancel();
if (drained == 0)
{
if (quicheConnection.isConnectionClosed())
QuicSession.this.close();
return Action.IDLE;
}
BufferUtil.flipToFlush(cipherBuffer, pos);
connection.write(this, addressBuffer, cipherBuffer);
return Action.SCHEDULED;
@ -217,7 +279,6 @@ public class QuicSession
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
byteBufferPool.release(addressBuffer);
byteBufferPool.release(cipherBuffer);
connection.close();
}
}
}