From af885c3b49616221db8d356473de44f56a3a43bd Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 19 Oct 2021 12:44:08 +0200 Subject: [PATCH] Issue #6728 - QUIC and HTTP/3 - Improvements to the thread model implementation. Signed-off-by: Simone Bordet --- .../client/internal/ClientHTTP3Session.java | 6 ++-- .../server/internal/HttpChannelOverHTTP3.java | 13 ++++++++ .../server/internal/ServerHTTP3Session.java | 6 ++-- .../quic/client/ClientProtocolSession.java | 5 +-- .../jetty/quic/common/ProtocolSession.java | 32 ++++++++++++------- .../jetty/quic/common/QuicConnection.java | 6 +++- .../jetty/quic/common/QuicSession.java | 1 + .../jetty/quic/common/QuicStreamEndPoint.java | 21 ++++++------ .../quic/server/ServerProtocolSession.java | 4 +-- 9 files changed, 63 insertions(+), 31 deletions(-) diff --git a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java index ae6848c5b14..30aa39d178d 100644 --- a/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java +++ b/jetty-http3/http3-client/src/main/java/org/eclipse/jetty/http3/client/internal/ClientHTTP3Session.java @@ -127,21 +127,21 @@ public class ClientHTTP3Session extends ClientProtocolSession } @Override - protected void onReadable(long readableStreamId) + protected boolean onReadable(long readableStreamId) { StreamType streamType = StreamType.from(readableStreamId); if (streamType == StreamType.CLIENT_BIDIRECTIONAL) { if (LOG.isDebugEnabled()) LOG.debug("bidirectional stream #{} selected for read", readableStreamId); - super.onReadable(readableStreamId); + return super.onReadable(readableStreamId); } else { QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint); if (LOG.isDebugEnabled()) LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint); - streamEndPoint.onReadable(); + return streamEndPoint.onReadable(); } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java index 2feb8854fa6..b46c1a0eb60 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; public class HttpChannelOverHTTP3 extends HttpChannel { private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHTTP3.class); + private static final HttpInput.Content NULL_CONTENT = new NullContent(); private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION); @@ -214,6 +215,9 @@ public class HttpChannelOverHTTP3 extends HttpChannel @Override public boolean needContent() { + if (content == NULL_CONTENT) + content = null; + if (content != null) return true; @@ -226,6 +230,8 @@ public class HttpChannelOverHTTP3 extends HttpChannel { if (content != null) { + if (content == NULL_CONTENT) + return null; HttpInput.Content result = content; if (!result.isSpecial()) content = null; @@ -238,7 +244,10 @@ public class HttpChannelOverHTTP3 extends HttpChannel if (LOG.isDebugEnabled()) LOG.debug("read {} on {}", data, this); if (data == null) + { + content = NULL_CONTENT; return null; + } content = new HttpInput.Content(data.getByteBuffer()) { @@ -296,4 +305,8 @@ public class HttpChannelOverHTTP3 extends HttpChannel { return false; } + + private static class NullContent extends HttpInput.SpecialContent + { + } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java index b1c16c550e7..33fb777a18e 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/ServerHTTP3Session.java @@ -127,21 +127,21 @@ public class ServerHTTP3Session extends ServerProtocolSession } @Override - protected void onReadable(long readableStreamId) + protected boolean onReadable(long readableStreamId) { StreamType streamType = StreamType.from(readableStreamId); if (streamType == StreamType.CLIENT_BIDIRECTIONAL) { if (LOG.isDebugEnabled()) LOG.debug("bidirectional stream #{} selected for read", readableStreamId); - super.onReadable(readableStreamId); + return super.onReadable(readableStreamId); } else { QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureUnidirectionalStreamEndPoint); if (LOG.isDebugEnabled()) LOG.debug("unidirectional stream #{} selected for read: {}", readableStreamId, streamEndPoint); - streamEndPoint.onReadable(); + return streamEndPoint.onReadable(); } } diff --git a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java index a7f484d0844..f5f86425746 100644 --- a/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java +++ b/jetty-quic/quic-client/src/main/java/org/eclipse/jetty/quic/client/ClientProtocolSession.java @@ -50,14 +50,15 @@ public class ClientProtocolSession extends ProtocolSession } @Override - protected void onReadable(long readableStreamId) + protected boolean onReadable(long readableStreamId) { // On the client, we need a get-only semantic in case of reads. QuicStreamEndPoint streamEndPoint = getStreamEndPoint(readableStreamId); if (LOG.isDebugEnabled()) LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); if (streamEndPoint != null) - streamEndPoint.onReadable(); + return streamEndPoint.onReadable(); + return false; } @Override diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java index 17bc66b46ae..f279962ebbe 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java @@ -51,6 +51,8 @@ public abstract class ProtocolSession extends ContainerLifeCycle public void process() { + if (LOG.isDebugEnabled()) + LOG.debug("processing {}", this); strategy.produce(); } @@ -87,15 +89,17 @@ public abstract class ProtocolSession extends ContainerLifeCycle streamEndPoint.onWritable(); } - protected void processReadableStreams() + protected boolean processReadableStreams() { List readableStreamIds = session.getReadableStreamIds(); if (LOG.isDebugEnabled()) LOG.debug("readable stream ids: {}", readableStreamIds); - readableStreamIds.forEach(this::onReadable); + return readableStreamIds.stream() + .map(this::onReadable) + .reduce(false, (result, readable) -> result || readable); } - protected abstract void onReadable(long readableStreamId); + protected abstract boolean onReadable(long readableStreamId); public void configureProtocolEndPoint(QuicStreamEndPoint endPoint) { @@ -167,18 +171,24 @@ public abstract class ProtocolSession extends ContainerLifeCycle { Runnable task = poll(); if (LOG.isDebugEnabled()) - LOG.debug("dequeued task {} on {}", task, ProtocolSession.this); + LOG.debug("dequeued existing task {} on {}", task, ProtocolSession.this); if (task != null) return task; - processWritableStreams(); - processReadableStreams(); + while (true) + { + processWritableStreams(); + boolean loop = processReadableStreams(); - task = poll(); - if (LOG.isDebugEnabled()) - LOG.debug("dequeued produced task {} on {}", task, ProtocolSession.this); - if (task != null) - return task; + task = poll(); + if (LOG.isDebugEnabled()) + LOG.debug("dequeued produced task {} on {}", task, ProtocolSession.this); + if (task != null) + return task; + + if (!loop) + break; + } CloseInfo closeInfo = session.getRemoteCloseInfo(); if (closeInfo != null) diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java index d324536740c..f7ff8c86c3d 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicConnection.java @@ -173,7 +173,11 @@ public abstract class QuicConnection extends AbstractConnection try { if (isFillInterested()) + { + if (LOG.isDebugEnabled()) + LOG.debug("receiveAndProcess() idle"); return null; + } ByteBuffer cipherBuffer = byteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); while (true) @@ -253,7 +257,7 @@ public abstract class QuicConnection extends AbstractConnection catch (Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("exception in receiveAndProcess()", x); + LOG.debug("receiveAndProcess() failure", x); // TODO: close? return null; } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java index 5b0cf0e412a..df7cc2f07d2 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicSession.java @@ -349,6 +349,7 @@ public abstract class QuicSession extends ContainerLifeCycle processing.set(false); } + // TODO: this is ugly, is there a better solution? protected Runnable pollTask() { return null; diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java index 6f80955a555..a84e653524d 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java @@ -17,7 +17,6 @@ import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; import org.eclipse.jetty.io.AbstractEndPoint; @@ -41,7 +40,6 @@ public class QuicStreamEndPoint extends AbstractEndPoint private static final Logger LOG = LoggerFactory.getLogger(QuicStreamEndPoint.class); private static final ByteBuffer LAST_FLAG = ByteBuffer.allocate(0); - private final AtomicBoolean readable = new AtomicBoolean(true); private final QuicSession session; private final long streamId; @@ -211,27 +209,32 @@ public class QuicStreamEndPoint extends AbstractEndPoint getWriteFlusher().completeWrite(); } - public void onReadable() + /** + * @return whether this endPoint is interested in reads + */ + public boolean onReadable() { - boolean expected = readable.compareAndExchange(true, false); + boolean interested = isFillInterested(); if (LOG.isDebugEnabled()) - LOG.debug("stream {} is readable, processing: {}", streamId, expected); - if (expected) + LOG.debug("stream {} is readable, processing: {}", streamId, interested); + if (interested) getFillInterest().fillable(); + return interested; } @Override public void fillInterested(Callback callback) { - readable.set(true); super.fillInterested(callback); + getQuicSession().getProtocolSession().process(); } @Override public boolean tryFillInterested(Callback callback) { - readable.set(true); - return super.tryFillInterested(callback); + boolean result = super.tryFillInterested(callback); + getQuicSession().getProtocolSession().process(); + return result; } @Override diff --git a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java index 95a3995b910..c8bf1b36b20 100644 --- a/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java +++ b/jetty-quic/quic-server/src/main/java/org/eclipse/jetty/quic/server/ServerProtocolSession.java @@ -34,13 +34,13 @@ public class ServerProtocolSession extends ProtocolSession } @Override - protected void onReadable(long readableStreamId) + protected boolean onReadable(long readableStreamId) { // On the server, we need a get-or-create semantic in case of reads. QuicStreamEndPoint streamEndPoint = getOrCreateStreamEndPoint(readableStreamId, this::configureProtocolEndPoint); if (LOG.isDebugEnabled()) LOG.debug("stream #{} selected for read: {}", readableStreamId, streamEndPoint); - streamEndPoint.onReadable(); + return streamEndPoint.onReadable(); } @Override