Fixed flaky test ClientServerTest.testReadDataFromOnRequestWithoutDemanding().

Removed assumption that returning a null stream listener implies that the stream input must be shut down,
because the reads may be performed in a spin loop in another thread, without demanding.

Rewritten the test to avoid blocking the thread that called onRequest(), otherwise the data frames cannot
be read from the network.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-05-17 22:57:23 +02:00
parent b018261a85
commit eaeb633fbe
No known key found for this signature in database
GPG Key ID: 1677D141BCF3584D
3 changed files with 27 additions and 27 deletions

View File

@ -123,8 +123,6 @@ public class HTTP3SessionClient extends HTTP3Session implements Session.Client
{
if (x == null)
{
if (listener == null)
endPoint.shutdownInput(HTTP3ErrorCode.NO_ERROR.code());
stream.updateClose(frame.isLast(), true);
promise.succeeded(stream);
}

View File

@ -17,7 +17,6 @@ import java.util.EnumSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http3.HTTP3ErrorCode;
import org.eclipse.jetty.http3.HTTP3Session;
import org.eclipse.jetty.http3.HTTP3Stream;
import org.eclipse.jetty.http3.MessageFlusher;
@ -27,7 +26,6 @@ import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,11 +47,7 @@ public class HTTP3StreamServer extends HTTP3Stream implements Stream.Server
onHeaders(frame);
Listener listener = this.listener = notifyRequest(frame);
if (listener == null)
{
QuicStreamEndPoint endPoint = getEndPoint();
Callback callback = Callback.from(Invocable.InvocationType.NON_BLOCKING, () -> endPoint.shutdownInput(HTTP3ErrorCode.NO_ERROR.code()));
getSession().writeMessageFrame(getId(), new MessageFlusher.FlushFrame(), callback);
}
getSession().writeMessageFrame(getId(), new MessageFlusher.FlushFrame(), Callback.NOOP);
updateClose(frame.isLast(), false);
}
}

View File

@ -44,7 +44,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -564,40 +563,47 @@ public class ClientServerTest extends AbstractClientServerTest
}
@Test
public void testReadDataFromOnRequest() throws Exception
public void testReadDataFromOnRequestWithoutDemanding() throws Exception
{
CountDownLatch requestLatch = new CountDownLatch(1);
CountDownLatch data1Latch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
requestLatch.countDown();
Stream.Data data = await().atMost(5, TimeUnit.SECONDS).until(stream::readData, notNullValue());
data.release();
stream.demand();
data1Latch.countDown();
return new Stream.Server.Listener()
// This thread cannot be blocked otherwise we never
// read from the network what the client is sending.
// Therefore, spawn a new thread to read the content
// in a spin loop without calling demand().
new Thread(() -> readWithoutDemanding(stream)).start();
return null;
}
private void readWithoutDemanding(Stream.Server stream)
{
@Override
public void onDataAvailable(Stream.Server stream)
try
{
while (true)
{
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand();
return;
Thread.sleep(100);
continue;
}
data.release();
if (!data.isLast())
if (data.isLast())
{
stream.demand();
return;
}
stream.respond(new HeadersFrame(new MetaData.Response(HttpStatus.OK_200, null, HttpVersion.HTTP_3, HttpFields.EMPTY), true));
break;
}
}
}
catch (InterruptedException ignored)
{
}
};
}
});
@ -615,10 +621,12 @@ public class ClientServerTest extends AbstractClientServerTest
.get(5, TimeUnit.SECONDS);
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
Thread.sleep(500);
clientStream.data(new DataFrame(ByteBuffer.allocate(1024), false));
assertTrue(data1Latch.await(555, TimeUnit.SECONDS));
Thread.sleep(500);
clientStream.data(new DataFrame(ByteBuffer.allocate(512), true));
assertTrue(responseLatch.await(5, TimeUnit.SECONDS));
}
}