run request handling + flush cipher text to socket

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-16 12:22:18 +01:00 committed by Simone Bordet
parent dfcb673425
commit c2d8688feb
3 changed files with 38 additions and 11 deletions

View File

@ -42,11 +42,13 @@ public class QuicConnection extends AbstractConnection
private final ConcurrentMap<QuicheConnectionId, QuicSession> sessions = new ConcurrentHashMap<>();
private final Connector connector;
private final QuicheConfig quicheConfig;
private final ByteBufferPool byteBufferPool;
public QuicConnection(Connector connector, ServerDatagramEndPoint endp)
{
super(endp, connector.getExecutor());
this.connector = connector;
this.byteBufferPool = connector.getByteBufferPool();
File[] files;
try
@ -93,7 +95,6 @@ public class QuicConnection extends AbstractConnection
{
try
{
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
ByteBuffer cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN + ServerDatagramEndPoint.ENCODED_ADDRESS_LENGTH, true);
BufferUtil.flipToFill(cipherBuffer);
while (true)
@ -167,16 +168,20 @@ public class QuicConnection extends AbstractConnection
for (Long readableStreamId : readableStreamIds)
{
boolean writable = writableStreamIds.remove(readableStreamId);
QuicStreamEndPoint streamEndPoint = session.getOrCreateStreamEndPoint(connector, connector.getScheduler(), getEndPoint().getLocalAddress(), remoteAddress, readableStreamId);
QuicStreamEndPoint streamEndPoint = session.getOrCreateStreamEndPoint(connector, connector.getScheduler(), this, getEndPoint().getLocalAddress(), remoteAddress, readableStreamId);
if (LOG.isDebugEnabled())
LOG.debug("selected endpoint for read{} : {}", (writable ? " and write" : ""), streamEndPoint);
streamEndPoint.onSelected(remoteAddress, true, writable);
Runnable runnable = streamEndPoint.onSelected(remoteAddress, true, writable);
// TODO: run with EWYK
runnable.run();
}
for (Long writableStreamId : writableStreamIds)
{
QuicStreamEndPoint streamEndPoint = session.getOrCreateStreamEndPoint(connector, connector.getScheduler(), getEndPoint().getLocalAddress(), remoteAddress, writableStreamId);
QuicStreamEndPoint streamEndPoint = session.getOrCreateStreamEndPoint(connector, connector.getScheduler(), this, getEndPoint().getLocalAddress(), remoteAddress, writableStreamId);
LOG.debug("selected endpoint for write : {}", streamEndPoint);
streamEndPoint.onSelected(remoteAddress, false, true);
Runnable runnable = streamEndPoint.onSelected(remoteAddress, false, true);
// TODO: run with EWYK
runnable.run();
}
}
}
@ -189,6 +194,20 @@ public class QuicConnection extends AbstractConnection
}
}
public void flushCipherText(QuicheConnection quicheConnection, InetSocketAddress remoteAddress) throws IOException
{
ByteBuffer addressBuffer = createFlushableAddressBuffer(byteBufferPool, remoteAddress);
ByteBuffer cipherBuffer = byteBufferPool.acquire(LibQuiche.QUICHE_MIN_CLIENT_INITIAL_LEN + ServerDatagramEndPoint.ENCODED_ADDRESS_LENGTH, true);
BufferUtil.flipToFill(cipherBuffer);
int drained = quicheConnection.drainCipherText(cipherBuffer);
cipherBuffer.flip();
getEndPoint().write((UnequivocalCallback)x ->
{
byteBufferPool.release(addressBuffer);
byteBufferPool.release(cipherBuffer);
}, addressBuffer, cipherBuffer);
}
private static ByteBuffer createFlushableAddressBuffer(ByteBufferPool byteBufferPool, InetSocketAddress remoteAddress) throws IOException
{
ByteBuffer addressBuffer = byteBufferPool.acquire(ServerDatagramEndPoint.ENCODED_ADDRESS_LENGTH, true);

View File

@ -42,13 +42,13 @@ public class QuicSession
return quicheConnection;
}
public QuicStreamEndPoint getOrCreateStreamEndPoint(Connector connector, Scheduler scheduler, InetSocketAddress localAddress, InetSocketAddress remoteAddress, long streamId)
public QuicStreamEndPoint getOrCreateStreamEndPoint(Connector connector, Scheduler scheduler, QuicConnection quicConnection, InetSocketAddress localAddress, InetSocketAddress remoteAddress, long streamId)
{
QuicStreamEndPoint endPoint = endpoints.compute(streamId, (sid, quicStreamEndPoint) ->
{
if (quicStreamEndPoint == null)
{
quicStreamEndPoint = createQuicStreamEndPoint(connector, scheduler, localAddress, remoteAddress, streamId);
quicStreamEndPoint = createQuicStreamEndPoint(connector, scheduler, quicConnection, localAddress, remoteAddress, streamId);
LOG.debug("creating endpoint for stream {}", sid);
}
return quicStreamEndPoint;
@ -57,9 +57,9 @@ public class QuicSession
return endPoint;
}
private QuicStreamEndPoint createQuicStreamEndPoint(Connector connector, Scheduler scheduler, InetSocketAddress localAddress, InetSocketAddress remoteAddress, long streamId)
private QuicStreamEndPoint createQuicStreamEndPoint(Connector connector, Scheduler scheduler, QuicConnection quicConnection, InetSocketAddress localAddress, InetSocketAddress remoteAddress, long streamId)
{
QuicStreamEndPoint endPoint = new QuicStreamEndPoint(scheduler, streamId, quicheConnection, localAddress, remoteAddress);
QuicStreamEndPoint endPoint = new QuicStreamEndPoint(scheduler, quicheConnection, quicConnection, localAddress, remoteAddress, streamId);
// String negotiatedProtocol = quicheConnection.getNegotiatedProtocol();
// ConnectionFactory connectionFactory = connector.getConnectionFactory(negotiatedProtocol);

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.FillInterest;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
@ -47,17 +48,19 @@ public class QuicStreamEndPoint extends IdleTimeout implements EndPoint
};
private final long streamId;
private final QuicheConnection quicheConnection;
private final QuicConnection quicConnection;
private final InetSocketAddress localAddress;
private InetSocketAddress remoteAddress;
private boolean open;
private Connection connection;
public QuicStreamEndPoint(Scheduler scheduler, long streamId, QuicheConnection quicheConnection, InetSocketAddress localAddress, InetSocketAddress remoteAddress)
public QuicStreamEndPoint(Scheduler scheduler, QuicheConnection quicheConnection, QuicConnection quicConnection, InetSocketAddress localAddress, InetSocketAddress remoteAddress, long streamId)
{
super(scheduler);
this.streamId = streamId;
this.quicheConnection = quicheConnection;
this.quicConnection = quicConnection;
this.localAddress = localAddress;
this.remoteAddress = remoteAddress;
}
@ -120,7 +123,10 @@ public class QuicStreamEndPoint extends IdleTimeout implements EndPoint
@Override
public int fill(ByteBuffer buffer) throws IOException
{
return quicheConnection.drainClearTextForStream(streamId, buffer);
BufferUtil.flipToFill(buffer);
int drained = quicheConnection.drainClearTextForStream(streamId, buffer);
buffer.flip();
return drained;
}
@Override
@ -129,6 +135,8 @@ public class QuicStreamEndPoint extends IdleTimeout implements EndPoint
for (ByteBuffer buffer : buffers)
{
int fed = quicheConnection.feedClearTextForStream(streamId, buffer);
if (fed > 0)
quicConnection.flushCipherText(quicheConnection, remoteAddress);
if (buffer.hasRemaining())
return false;
}