Fixes #7774 - Flaky Test: DataDemandTest.testBlockingReadInADifferentThread(). (#7776)

Do not try to release the network buffer after having notified onDataAvailable()
because it can be in a race with the application trying to read data and also
trying to release the network buffer.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2022-03-25 11:30:53 +01:00 committed by GitHub
parent a35719367b
commit 40e7d6a716
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 73 additions and 4 deletions

View File

@ -90,6 +90,13 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
fillInterested();
}
@Override
public void onClose(Throwable cause)
{
tryReleaseBuffer(true);
super.onClose(cause);
}
@Override
protected boolean onReadTimeout(Throwable timeout)
{
@ -113,7 +120,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
processDataDemand();
if (!parserDataMode)
{
if (buffer != null && buffer.hasRemaining())
if (hasBuffer() && buffer.hasRemaining())
processNonDataFrames();
else
fillInterested();
@ -166,8 +173,8 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
LOG.debug("setting fill interest on {}", this);
fillInterested();
}
tryReleaseBuffer(false);
}
tryReleaseBuffer(false);
return;
}
}
@ -346,7 +353,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private void tryAcquireBuffer()
{
if (buffer == null)
if (!hasBuffer())
{
buffer = buffers.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
@ -356,7 +363,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private void tryReleaseBuffer(boolean force)
{
if (buffer != null)
if (hasBuffer())
{
if (buffer.hasRemaining() && force)
buffer.clear();
@ -370,6 +377,11 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
}
}
public boolean hasBuffer()
{
return buffer != null;
}
private MessageParser.Result parseAndFill(boolean setFillInterest)
{
try

View File

@ -32,6 +32,8 @@ import org.eclipse.jetty.http3.api.Stream;
import org.eclipse.jetty.http3.frames.DataFrame;
import org.eclipse.jetty.http3.frames.HeadersFrame;
import org.eclipse.jetty.http3.internal.HTTP3Stream;
import org.eclipse.jetty.http3.internal.HTTP3StreamConnection;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.BufferUtil;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
@ -40,6 +42,7 @@ import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -584,4 +587,58 @@ public class DataDemandTest extends AbstractClientServerTest
assertTrue(lastDataLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testOnDataAvailableThenNoReadThenIdleTimeoutReleasesNetworkBuffer() throws Exception
{
long idleTimeout = 1000;
CountDownLatch onDataLatch = new CountDownLatch(1);
CountDownLatch idleLatch = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1);
start(new Session.Server.Listener()
{
@Override
public Stream.Server.Listener onRequest(Stream.Server stream, HeadersFrame frame)
{
HTTP3Stream http3Stream = (HTTP3Stream)stream;
http3Stream.setIdleTimeout(idleTimeout);
http3Stream.getEndPoint().getConnection().addEventListener(new Connection.Listener.Adapter()
{
@Override
public void onClosed(Connection connection)
{
assertFalse(((HTTP3StreamConnection)connection).hasBuffer());
closeLatch.countDown();
}
});
stream.demand();
return new Stream.Server.Listener()
{
@Override
public void onDataAvailable(Stream.Server stream)
{
// Do not read.
onDataLatch.countDown();
}
@Override
public boolean onIdleTimeout(Stream.Server stream, Throwable failure)
{
idleLatch.countDown();
return true;
}
};
}
});
Session.Client session = newSession(new Session.Client.Listener() {});
HeadersFrame request = new HeadersFrame(newRequest("/"), false);
Stream stream = session.newRequest(request, new Stream.Client.Listener() {}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(ByteBuffer.allocate(16 * 1024), true));
assertTrue(onDataLatch.await(5, TimeUnit.SECONDS));
assertTrue(idleLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS));
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
}