diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java index b7628584561..1727eaec4e1 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/api/Stream.java @@ -87,6 +87,14 @@ public interface Stream *

{@link Stream.Data} objects may be stored away for later, asynchronous, * processing (for example, to process them only when all of them have been * received).

+ *

This method must only be called when there is no outstanding + * {@link #demand() demand}.

+ *

Practically, this means that this method should be called either + * synchronously from within {@link Stream.Listener#onDataAvailable(Stream)}, + * or applications must arrange, for example using a + * {@link java.util.concurrent.Semaphore}, that a call to + * {@link Stream.Listener#onDataAvailable(Stream)} is made before + * calling this method (possibly from a different thread).

* * @return a {@link Stream.Data} object containing the request bytes or * the response bytes, or null if no bytes are available diff --git a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java index 0e5a55feefd..02e76200c39 100644 --- a/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java +++ b/jetty-http3/http3-common/src/main/java/org/eclipse/jetty/http3/internal/HTTP3StreamConnection.java @@ -131,6 +131,9 @@ public abstract class HTTP3StreamConnection extends AbstractConnection if (LOG.isDebugEnabled()) LOG.debug("reading data on {}", this); + if (hasDemand()) + throw new IllegalStateException("invalid call to readData(): outstanding demand"); + switch (parseAndFill()) { case FRAME: diff --git a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java index 9e5f9680e04..691345f75ef 100644 --- a/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java +++ b/jetty-http3/http3-tests/src/test/java/org/eclipse/jetty/http3/tests/HTTP3DataDemandTest.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -465,4 +466,84 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testBlockingReadInADifferentThread() throws Exception + { + CountDownLatch blockLatch = new CountDownLatch(1); + CountDownLatch dataLatch = new CountDownLatch(1); + startServer(new Session.Server.Listener() + { + @Override + public Stream.Listener onRequest(Stream stream, HeadersFrame frame) + { + stream.demand(); + + // Simulate a thread dispatched to read the request content with blocking I/O. + Semaphore semaphore = new Semaphore(0); + new Thread(() -> + { + try + { + // Wait for onDataAvailable() to be called before start reading. + semaphore.acquire(); + while (true) + { + Stream.Data data = stream.readData(); + if (data != null) + { + // Consume the data. + data.complete(); + if (data.isLast()) + { + dataLatch.countDown(); + return; + } + } + else + { + // Demand and block. + stream.demand(); + blockLatch.countDown(); + semaphore.acquire(); + } + } + } + catch (InterruptedException x) + { + x.printStackTrace(); + } + }).start(); + + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + semaphore.release(); + } + }; + } + }); + startClient(); + + Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {}) + .get(5, TimeUnit.SECONDS); + + HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/"); + MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY); + HeadersFrame request = new HeadersFrame(metaData, false); + Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS); + + // Send a first chunk of data. + stream.data(new DataFrame(ByteBuffer.allocate(16 * 1024), false)); + + // Wait some time until the server reads no data after the first chunk. + assertTrue(blockLatch.await(5, TimeUnit.SECONDS)); + + // Send the last chunk of data. + stream.data(new DataFrame(ByteBuffer.allocate(32 * 1024), true)); + + assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + } }