improvements about how tasks are dispatched

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-03-18 17:19:22 +01:00 committed by Simone Bordet
parent 90fbd977d7
commit 22cf07dea8
2 changed files with 18 additions and 14 deletions

View File

@ -19,7 +19,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http3.quiche.QuicheConnection;
import org.eclipse.jetty.http3.quiche.QuicheConnectionId;
@ -111,26 +111,34 @@ public class QuicSession
{
this.remoteAddress = remoteAddress;
quicheConnection.feedCipherText(cipherBufferIn);
flush();
if (quicheConnection.isConnectionEstablished())
{
List<Long> writableStreamIds = quicheConnection.writableStreamIds();
if (LOG.isDebugEnabled())
LOG.debug("writable stream ids: {}", writableStreamIds);
Runnable onWritable = () ->
{
for (Long writableStreamId : writableStreamIds)
{
onWritable(writableStreamId);
}
};
connection.dispatch(onWritable);
List<Long> readableStreamIds = quicheConnection.readableStreamIds();
if (LOG.isDebugEnabled())
LOG.debug("readable stream ids: {}", readableStreamIds);
for (Long readableStreamId : readableStreamIds)
{
onReadable(readableStreamId);
Runnable onReadable = () -> onReadable(readableStreamId);
connection.dispatch(onReadable);
}
}
else
{
flush();
}
}
private void onWritable(long writableStreamId)
@ -146,8 +154,7 @@ public class QuicSession
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId);
if (LOG.isDebugEnabled())
LOG.debug("selected endpoint for read: {}", streamEndPoint);
Runnable runnable = streamEndPoint.onReadable();
connection.dispatch(runnable);
streamEndPoint.onReadable();
}
void flush()
@ -256,7 +263,7 @@ public class QuicSession
if (LOG.isDebugEnabled())
LOG.debug("next quiche timeout: {} ms", nextTimeoutInMs);
if (nextTimeoutInMs > -1)
timeout.schedule(nextTimeoutInMs, TimeUnit.MILLISECONDS); // TODO is this re-scheduling cancelling the previous timeout?
timeout.schedule(nextTimeoutInMs, TimeUnit.MILLISECONDS);
else
timeout.cancel();
if (drained == 0)

View File

@ -89,9 +89,6 @@ public class QuicStreamEndPoint extends AbstractEndPoint
if (LOG.isDebugEnabled())
LOG.debug("Error sending FIN on stream {}", streamId, e);
}
// TODO: we must wait until writeFlusher is idle before moving on
// while (!getWriteFlusher().isIdle())
// Thread.onSpinWait();
super.onClose(failure);
session.onClose(streamId);
}
@ -132,9 +129,9 @@ public class QuicStreamEndPoint extends AbstractEndPoint
getWriteFlusher().completeWrite();
}
public Runnable onReadable()
public void onReadable()
{
return () -> getFillInterest().fillable();
getFillInterest().fillable();
}
@Override