Issue #6728 - QUIC and HTTP/3

- Improvements to the thread model implementation.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-10-19 12:44:08 +02:00
parent 0b5241df6b
commit af885c3b49
9 changed files with 63 additions and 31 deletions

View File

@ -127,21 +127,21 @@ public class ClientHTTP3Session extends ClientProtocolSession
} }
@Override @Override
protected void onReadable(long readableStreamId) protected boolean onReadable(long readableStreamId)
{ {
StreamType streamType = StreamType.from(readableStreamId); StreamType streamType = StreamType.from(readableStreamId);
if (streamType == StreamType.CLIENT_BIDIRECTIONAL) if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("bidirectional stream #{} selected for read", readableStreamId); LOG.debug("bidirectional stream #{} selected for read", readableStreamId);
super.onReadable(readableStreamId); return super.onReadable(readableStreamId);
} }
else else
{ {
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint); QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint); LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable(); return streamEndPoint.onReadable();
} }
} }

View File

@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
public class HttpChannelOverHTTP3 extends HttpChannel public class HttpChannelOverHTTP3 extends HttpChannel
{ {
private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHTTP3.class); private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHTTP3.class);
private static final HttpInput.Content NULL_CONTENT = new NullContent();
private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION);
private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION); private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION);
@ -214,6 +215,9 @@ public class HttpChannelOverHTTP3 extends HttpChannel
@Override @Override
public boolean needContent() public boolean needContent()
{ {
if (content == NULL_CONTENT)
content = null;
if (content != null) if (content != null)
return true; return true;
@ -226,6 +230,8 @@ public class HttpChannelOverHTTP3 extends HttpChannel
{ {
if (content != null) if (content != null)
{ {
if (content == NULL_CONTENT)
return null;
HttpInput.Content result = content; HttpInput.Content result = content;
if (!result.isSpecial()) if (!result.isSpecial())
content = null; content = null;
@ -238,7 +244,10 @@ public class HttpChannelOverHTTP3 extends HttpChannel
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("read {} on {}", data, this); LOG.debug("read {} on {}", data, this);
if (data == null) if (data == null)
{
content = NULL_CONTENT;
return null; return null;
}
content = new HttpInput.Content(data.getByteBuffer()) content = new HttpInput.Content(data.getByteBuffer())
{ {
@ -296,4 +305,8 @@ public class HttpChannelOverHTTP3 extends HttpChannel
{ {
return false; return false;
} }
private static class NullContent extends HttpInput.SpecialContent
{
}
} }

View File

@ -127,21 +127,21 @@ public class ServerHTTP3Session extends ServerProtocolSession
} }
@Override @Override
protected void onReadable(long readableStreamId) protected boolean onReadable(long readableStreamId)
{ {
StreamType streamType = StreamType.from(readableStreamId); StreamType streamType = StreamType.from(readableStreamId);
if (streamType == StreamType.CLIENT_BIDIRECTIONAL) if (streamType == StreamType.CLIENT_BIDIRECTIONAL)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("bidirectional stream #{} selected for read", readableStreamId); LOG.debug("bidirectional stream #{} selected for read", readableStreamId);
super.onReadable(readableStreamId); return super.onReadable(readableStreamId);
} }
else else
{ {
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint); QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint); LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable(); return streamEndPoint.onReadable();
} }
} }

View File

@ -50,14 +50,15 @@ public class ClientProtocolSession extends ProtocolSession
} }
@Override @Override
protected void onReadable(long readableStreamId) protected boolean onReadable(long readableStreamId)
{ {
// On the client, we need a get-only semantic in case of reads. // On the client, we need a get-only semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getStreamEndPoint(readableStreamId); QuicStreamEndPoint streamEndPoint = getStreamEndPoint(readableStreamId);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
if (streamEndPoint != null) if (streamEndPoint != null)
streamEndPoint.onReadable(); return streamEndPoint.onReadable();
return false;
} }
@Override @Override

View File

@ -51,6 +51,8 @@ public abstract class ProtocolSession extends ContainerLifeCycle
public void process() public void process()
{ {
if (LOG.isDebugEnabled())
LOG.debug("processing {}", this);
strategy.produce(); strategy.produce();
} }
@ -87,15 +89,17 @@ public abstract class ProtocolSession extends ContainerLifeCycle
streamEndPoint.onWritable(); streamEndPoint.onWritable();
} }
protected void processReadableStreams() protected boolean processReadableStreams()
{ {
List<Long> readableStreamIds = session.getReadableStreamIds(); List<Long> readableStreamIds = session.getReadableStreamIds();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("readable stream ids: {}", readableStreamIds); LOG.debug("readable stream ids: {}", readableStreamIds);
readableStreamIds.forEach(this::onReadable); return readableStreamIds.stream()
.map(this::onReadable)
.reduce(false, (result, readable) -> result || readable);
} }
protected abstract void onReadable(long readableStreamId); protected abstract boolean onReadable(long readableStreamId);
public void configureProtocolEndPoint(QuicStreamEndPoint endPoint) public void configureProtocolEndPoint(QuicStreamEndPoint endPoint)
{ {
@ -167,12 +171,14 @@ public abstract class ProtocolSession extends ContainerLifeCycle
{ {
Runnable task = poll(); Runnable task = poll();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("dequeued task {} on {}", task, ProtocolSession.this); LOG.debug("dequeued existing task {} on {}", task, ProtocolSession.this);
if (task != null) if (task != null)
return task; return task;
while (true)
{
processWritableStreams(); processWritableStreams();
processReadableStreams(); boolean loop = processReadableStreams();
task = poll(); task = poll();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -180,6 +186,10 @@ public abstract class ProtocolSession extends ContainerLifeCycle
if (task != null) if (task != null)
return task; return task;
if (!loop)
break;
}
CloseInfo closeInfo = session.getRemoteCloseInfo(); CloseInfo closeInfo = session.getRemoteCloseInfo();
if (closeInfo != null) if (closeInfo != null)
onClose(closeInfo.error(), closeInfo.reason()); onClose(closeInfo.error(), closeInfo.reason());

View File

@ -173,7 +173,11 @@ public abstract class QuicConnection extends AbstractConnection
try try
{ {
if (isFillInterested()) if (isFillInterested())
{
if (LOG.isDebugEnabled())
LOG.debug("receiveAndProcess() idle");
return null; return null;
}
ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
while (true) while (true)
@ -253,7 +257,7 @@ public abstract class QuicConnection extends AbstractConnection
catch (Throwable x) catch (Throwable x)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("exception in receiveAndProcess()", x); LOG.debug("receiveAndProcess() failure", x);
// TODO: close? // TODO: close?
return null; return null;
} }

View File

@ -349,6 +349,7 @@ public abstract class QuicSession extends ContainerLifeCycle
processing.set(false); processing.set(false);
} }
// TODO: this is ugly, is there a better solution?
protected Runnable pollTask() protected Runnable pollTask()
{ {
return null; return null;

View File

@ -17,7 +17,6 @@ import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import org.eclipse.jetty.io.AbstractEndPoint; import org.eclipse.jetty.io.AbstractEndPoint;
@ -41,7 +40,6 @@ public class QuicStreamEndPoint extends AbstractEndPoint
private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class); private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class);
private static final ByteBuffer LAST_FLAG = ByteBuffer.allocate(0); private static final ByteBuffer LAST_FLAG = ByteBuffer.allocate(0);
private final AtomicBoolean readable = new AtomicBoolean(true);
private final QuicSession session; private final QuicSession session;
private final long streamId; private final long streamId;
@ -211,27 +209,32 @@ public class QuicStreamEndPoint extends AbstractEndPoint
getWriteFlusher().completeWrite(); getWriteFlusher().completeWrite();
} }
public void onReadable() /**
* @return whether this endPoint is interested in reads
*/
public boolean onReadable()
{ {
boolean expected = readable.compareAndExchange(true, false); boolean interested = isFillInterested();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("stream {} is readable, processing: {}", streamId, expected); LOG.debug("stream {} is readable, processing: {}", streamId, interested);
if (expected) if (interested)
getFillInterest().fillable(); getFillInterest().fillable();
return interested;
} }
@Override @Override
public void fillInterested(Callback callback) public void fillInterested(Callback callback)
{ {
readable.set(true);
super.fillInterested(callback); super.fillInterested(callback);
getQuicSession().getProtocolSession().process();
} }
@Override @Override
public boolean tryFillInterested(Callback callback) public boolean tryFillInterested(Callback callback)
{ {
readable.set(true); boolean result = super.tryFillInterested(callback);
return super.tryFillInterested(callback); getQuicSession().getProtocolSession().process();
return result;
} }
@Override @Override

View File

@ -34,13 +34,13 @@ public class ServerProtocolSession extends ProtocolSession
} }
@Override @Override
protected void onReadable(long readableStreamId) protected boolean onReadable(long readableStreamId)
{ {
// On the server, we need a get-or-create semantic in case of reads. // On the server, we need a get-or-create semantic in case of reads.
QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureProtocolEndPoint); QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureProtocolEndPoint);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint);
streamEndPoint.onReadable(); return streamEndPoint.onReadable();
} }
@Override @Override