Issue #6728 - QUIC and HTTP/3

- Added test case for blocking I/O reads.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2021-09-29 12:49:04 +02:00
parent 49dc5617ac
commit 2115e4bf3e
3 changed files with 92 additions and 0 deletions

View File

@ -87,6 +87,14 @@ public interface Stream
* <p>{@link Stream.Data} objects may be stored away for later, asynchronous,
* processing (for example, to process them only when all of them have been
* received).</p>
* <p>This method <em>must only</em> be called when there is no outstanding
* {@link #demand() demand}.</p>
* <p>Practically, this means that this method should be called either
* synchronously from within {@link Stream.Listener#onDataAvailable(Stream)},
* or applications must arrange, for example using a
* {@link java.util.concurrent.Semaphore}, that a call to
* {@link Stream.Listener#onDataAvailable(Stream)} is made before
* calling this method (possibly from a different thread).</p>
*
* @return a {@link Stream.Data} object containing the request bytes or
* the response bytes, or null if no bytes are available

View File

@ -131,6 +131,9 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
if (LOG.isDebugEnabled())
LOG.debug("reading data on {}", this);
if (hasDemand())
throw new IllegalStateException("invalid call to readData(): outstanding demand");
switch (parseAndFill())
{
case FRAME:

View File

@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -465,4 +466,84 @@ public class HTTP3DataDemandTest extends AbstractHTTP3ClientServerTest
assertTrue(serverDataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBlockingReadInADifferentThread() throws Exception
{
CountDownLatch blockLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
startServer(new Session.Server.Listener()
{
@Override
public Stream.Listener onRequest(Stream stream, HeadersFrame frame)
{
stream.demand();
// Simulate a thread dispatched to read the request content with blocking I/O.
Semaphore semaphore = new Semaphore(0);
new Thread(() ->
{
try
{
// Wait for onDataAvailable() to be called before start reading.
semaphore.acquire();
while (true)
{
Stream.Data data = stream.readData();
if (data != null)
{
// Consume the data.
data.complete();
if (data.isLast())
{
dataLatch.countDown();
return;
}
}
else
{
// Demand and block.
stream.demand();
blockLatch.countDown();
semaphore.acquire();
}
}
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}).start();
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
semaphore.release();
}
};
}
});
startClient();
Session.Client session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Client.Listener() {})
.get(5, TimeUnit.SECONDS);
HttpURI uri = HttpURI.from("https://localhost:" + connector.getLocalPort() + "/");
MetaData.Request metaData = new MetaData.Request(HttpMethod.GET.asString(), uri, HttpVersion.HTTP_3, HttpFields.EMPTY);
HeadersFrame request = new HeadersFrame(metaData, false);
Stream stream = session.newRequest(request, new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
// Send a first chunk of data.
stream.data(new DataFrame(ByteBuffer.allocate(16 * 1024), false));
// Wait some time until the server reads no data after the first chunk.
assertTrue(blockLatch.await(5, TimeUnit.SECONDS));
// Send the last chunk of data.
stream.data(new DataFrame(ByteBuffer.allocate(32 * 1024), true));
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
}
}