diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java index 77a7f225547..7cc580a7a32 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectorHttpClientTransport.java @@ -33,7 +33,7 @@ public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpC protected AbstractConnectorHttpClientTransport(ClientConnector connector) { this.connector = Objects.requireNonNull(connector); - addBean(connector); + addBean(connector, false); } public ClientConnector getClientConnector() diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java index 15989523899..a501ebf4ea4 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java @@ -16,8 +16,6 @@ package org.eclipse.jetty.http3.internal; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.Queue; import java.util.concurrent.Executor; import org.eclipse.jetty.http.MetaData; @@ -42,7 +40,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection private static final ByteBuffer EMPTY_DATA_FRAME = ByteBuffer.allocate(2); private final AutoLock lock = new AutoLock(); - private final Queue dataFrames = new ArrayDeque<>(); private final RetainableByteBufferPool buffers; private final MessageParser parser; private boolean useInputDirectByteBuffers = true; @@ -50,7 +47,9 @@ public abstract class HTTP3StreamConnection extends AbstractConnection private boolean dataMode; private boolean dataDemand; private boolean dataStalled; + private DataFrame dataFrame; private boolean dataLast; + private boolean noData; private boolean remotelyClosed; public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser) @@ -120,7 +119,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection { while (true) { - if (parseAndFill() == MessageParser.Result.NO_FRAME) + if (parseAndFill(true) == MessageParser.Result.NO_FRAME) break; // TODO: we should also exit if the connection was closed due to errors. @@ -168,11 +167,12 @@ public abstract class HTTP3StreamConnection extends AbstractConnection if (hasDemand()) throw new IllegalStateException("invalid call to readData(): outstanding demand"); - switch (parseAndFill()) + switch (parseAndFill(false)) { case FRAME: { - DataFrame frame = dataFrames.poll(); + DataFrame frame = dataFrame; + dataFrame = null; if (LOG.isDebugEnabled()) LOG.debug("read data {} on {}", frame, this); if (frame == null) @@ -193,6 +193,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection } case NO_FRAME: { + if (LOG.isDebugEnabled()) + LOG.debug("read no data on {}", this); return null; } default: @@ -211,9 +213,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection public void demand() { + boolean interested; boolean process = false; try (AutoLock l = lock.lock()) { + interested = noData; dataDemand = true; if (dataStalled) { @@ -225,6 +229,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection LOG.debug("demand, wasStalled={} on {}", process, this); if (process) processDataDemand(); + else if (interested) + fillInterested(); } public boolean hasDemand() @@ -243,6 +249,14 @@ public abstract class HTTP3StreamConnection extends AbstractConnection } } + private void setNoData(boolean noData) + { + try (AutoLock l = lock.lock()) + { + this.noData = noData; + } + } + private void processDataDemand() { while (true) @@ -275,16 +289,18 @@ public abstract class HTTP3StreamConnection extends AbstractConnection } } - public MessageParser.Result parseAndFill() + public MessageParser.Result parseAndFill(boolean setFillInterest) { try { if (LOG.isDebugEnabled()) - LOG.debug("parse+fill on {} with buffer {}", this, buffer); + LOG.debug("parse+fill interest={} on {} with buffer {}", setFillInterest, this, buffer); if (buffer == null) buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); + setNoData(false); + while (true) { ByteBuffer byteBuffer = buffer.getBuffer(); @@ -320,7 +336,9 @@ public abstract class HTTP3StreamConnection extends AbstractConnection { buffer.release(); buffer = null; - fillInterested(); + setNoData(true); + if (setFillInterest) + fillInterested(); break; } else @@ -355,11 +373,6 @@ public abstract class HTTP3StreamConnection extends AbstractConnection } } - public DataFrame pollContent() - { - return dataFrames.poll(); - } - @Override public String toConnectionString() { @@ -397,9 +410,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection @Override public void onData(long streamId, DataFrame frame) { - remotelyClosed = frame.isLast(); + if (dataFrame != null) + throw new IllegalStateException(); + dataFrame = frame; dataLast = frame.isLast(); - dataFrames.offer(frame); + remotelyClosed = frame.isLast(); super.onData(streamId, frame); } } diff --git a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java index b46c1a0eb60..2feb8854fa6 100644 --- a/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java +++ b/jetty-http3/http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpChannelOverHTTP3.java @@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory; public class HttpChannelOverHTTP3 extends HttpChannel { private static final Logger LOG = LoggerFactory.getLogger(HttpChannelOverHTTP3.class); - private static final HttpInput.Content NULL_CONTENT = new NullContent(); private static final HttpField SERVER_VERSION = new PreEncodedHttpField(HttpHeader.SERVER, HttpConfiguration.SERVER_VERSION); private static final HttpField POWERED_BY = new PreEncodedHttpField(HttpHeader.X_POWERED_BY, HttpConfiguration.SERVER_VERSION); @@ -215,9 +214,6 @@ public class HttpChannelOverHTTP3 extends HttpChannel @Override public boolean needContent() { - if (content == NULL_CONTENT) - content = null; - if (content != null) return true; @@ -230,8 +226,6 @@ public class HttpChannelOverHTTP3 extends HttpChannel { if (content != null) { - if (content == NULL_CONTENT) - return null; HttpInput.Content result = content; if (!result.isSpecial()) content = null; @@ -244,10 +238,7 @@ public class HttpChannelOverHTTP3 extends HttpChannel if (LOG.isDebugEnabled()) LOG.debug("read {} on {}", data, this); if (data == null) - { - content = NULL_CONTENT; return null; - } content = new HttpInput.Content(data.getByteBuffer()) { @@ -305,8 +296,4 @@ public class HttpChannelOverHTTP3 extends HttpChannel { return false; } - - private static class NullContent extends HttpInput.SpecialContent - { - } } diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/DataDemandTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/DataDemandTest.java index a4fd7db22e5..df54016d457 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/DataDemandTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/DataDemandTest.java @@ -511,4 +511,56 @@ public class DataDemandTest extends AbstractClientServerTest assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testReadDataIdempotent() throws Exception + { + CountDownLatch nullDataLatch = new CountDownLatch(1); + CountDownLatch lastDataLatch = new CountDownLatch(1); + start(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + stream.demand(); + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + Stream.Data data = stream.readData(); + if (data == null) + { + // Second attempt to read still has no data, should be idempotent. + assertNull(stream.readData()); + stream.demand(); + nullDataLatch.countDown(); + } + else + { + data.complete(); + if (data.isLast()) + lastDataLatch.countDown(); + else + stream.demand(); + } + } + }; + } + }); + + Session.Client session = newSession(new Session.Client.Listener() {}); + + HeadersFrame request = new HeadersFrame(newRequest("/"), false); + Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS); + + // Send a first chunk to trigger reads. + stream.data(new DataFrame(ByteBuffer.allocate(16), false)); + + assertTrue(nullDataLatch.await(555, TimeUnit.SECONDS)); + + stream.data(new DataFrame(ByteBuffer.allocate(4096), true)); + + assertTrue(lastDataLatch.await(5, TimeUnit.SECONDS)); + } } diff --git a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java index f279962ebbe..abffcfced97 100644 --- a/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java +++ b/jetty-quic/quic-common/src/main/java/org/eclipse/jetty/quic/common/ProtocolSession.java @@ -94,9 +94,12 @@ public abstract class ProtocolSession extends ContainerLifeCycle List readableStreamIds = session.getReadableStreamIds(); if (LOG.isDebugEnabled()) LOG.debug("readable stream ids: {}", readableStreamIds); - return readableStreamIds.stream() - .map(this::onReadable) - .reduce(false, (result, readable) -> result || readable); + boolean result = false; + for (long readableStreamId : readableStreamIds) + { + result = result || onReadable(readableStreamId); + } + return result; } protected abstract boolean onReadable(long readableStreamId);