Fixes #10611 - Flaky StreamResetTest.testClientResetConsumesQueuedData() (#10655)

Fixed test case that was racy.
When the DATA frames arrived at the server before the call to consumeAvailable(), they were read and the client flow control window re-opened.
If it happened that the DATA frames arrived at the server after the call to consumeAvailable(), the client flow control window was not re-opened, making the test flaky.
Fixed by avoiding the race in the test.

Added over-release buffer tracking, add leak tracking to H2 tests, fix client leaks in tests.

Also reviewed the places that required re-opening of the flow control window in case the DATA frames are not read.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
Co-authored-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Simone Bordet 2023-10-04 14:47:26 +02:00 committed by GitHub
parent ff25dd8948
commit 53de4c8298
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 251 additions and 45 deletions

View File

@ -162,6 +162,7 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
@Override
public void reset(ResetFrame frame, Callback callback)
{
int flowControlLength;
Throwable resetFailure = null;
try (AutoLock ignored = lock.lock())
{
@ -174,7 +175,9 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
localReset = true;
failure = new EOFException("reset");
}
flowControlLength = drain();
}
session.dataConsumed(this, flowControlLength);
if (resetFailure != null)
callback.failed(resetFailure);
else
@ -340,6 +343,8 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
public void setListener(Listener listener)
{
this.listener = listener;
if (listener == null)
demand();
}
public void process(Frame frame, Callback callback)
@ -418,11 +423,14 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
private void onData(Data data)
{
DataFrame frame = data.frame();
// SPEC: remotely closed streams must be replied with a reset.
if (isRemotelyClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("Data {} for already closed {}", data, this);
session.dataConsumed(this, data.frame().flowControlLength());
reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.NOOP);
return;
}
@ -432,28 +440,25 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
// Just drop the frame.
if (LOG.isDebugEnabled())
LOG.debug("Data {} for already reset {}", data, this);
session.dataConsumed(this, data.frame().flowControlLength());
return;
}
if (dataLength >= 0)
{
DataFrame frame = data.frame();
dataLength -= frame.remaining();
if (dataLength < 0 || (frame.isEndStream() && dataLength != 0))
{
if (LOG.isDebugEnabled())
LOG.debug("Invalid data length {} for {}", data, this);
session.dataConsumed(this, data.frame().flowControlLength());
reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR.code), Callback.NOOP);
return;
}
}
boolean listenerPresent = getListener() != null;
boolean endStream = data.frame().isEndStream();
if ((listenerPresent || endStream) && offer(data))
if (offer(data))
processData();
if (!listenerPresent && updateClose(endStream, CloseState.Event.RECEIVED))
session.removeStream(this);
}
private boolean offer(Data data)
@ -555,15 +560,29 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
}
}
public long getDataLength()
{
try (AutoLock ignored = lock.lock())
{
return dataQueue.stream()
.mapToLong(data -> data.frame().remaining())
.sum();
}
}
private void onReset(ResetFrame frame, Callback callback)
{
int flowControlLength;
try (AutoLock ignored = lock.lock())
{
remoteReset = true;
failure = new EofException("reset");
flowControlLength = drain();
}
close();
if (session.removeStream(this))
boolean removed = session.removeStream(this);
session.dataConsumed(this, flowControlLength);
if (removed)
notifyReset(this, frame, callback);
else
callback.succeeded();
@ -584,17 +603,44 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum
private void onFailure(FailureFrame frame, Callback callback)
{
int flowControlLength;
try (AutoLock ignored = lock.lock())
{
failure = frame.getFailure();
flowControlLength = drain();
}
close();
if (session.removeStream(this))
boolean removed = session.removeStream(this);
session.dataConsumed(this, flowControlLength);
if (removed)
notifyFailure(this, frame, callback);
else
callback.succeeded();
}
private int drain()
{
assert lock.isHeldByCurrentThread();
int length = 0;
while (true)
{
Data data = dataQueue.poll();
if (data == null)
break;
data.release();
DataFrame frame = data.frame();
length += frame.flowControlLength();
if (frame.isEndStream())
{
dataQueue.offer(Data.eof(getId()));
break;
}
}
if (LOG.isDebugEnabled())
LOG.debug("Drained {} bytes for {}", length, this);
return length;
}
public boolean updateClose(boolean update, CloseState.Event event)
{
if (LOG.isDebugEnabled())

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Handler;
@ -42,12 +43,18 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
public class AbstractTest
{
protected Server server;
protected ServerConnector connector;
protected HTTP2Client http2Client;
protected HttpClient httpClient;
private ArrayByteBufferPool.Tracking serverBufferPool;
private ArrayByteBufferPool.Tracking clientBufferPool;
protected void start(Handler handler) throws Exception
{
@ -84,7 +91,8 @@ public class AbstractTest
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName("server");
server = new Server(serverExecutor);
serverBufferPool = new ArrayByteBufferPool.Tracking();
server = new Server(serverExecutor, null, serverBufferPool);
connector = new ServerConnector(server, 1, 1, connectionFactories);
server.addConnector(connector);
}
@ -92,6 +100,8 @@ public class AbstractTest
protected void prepareClient()
{
ClientConnector connector = new ClientConnector();
clientBufferPool = new ArrayByteBufferPool.Tracking();
connector.setByteBufferPool(clientBufferPool);
QueuedThreadPool clientExecutor = new QueuedThreadPool();
clientExecutor.setName("client");
connector.setExecutor(clientExecutor);
@ -127,8 +137,18 @@ public class AbstractTest
@AfterEach
public void dispose() throws Exception
{
try
{
if (serverBufferPool != null)
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Server leaks: " + serverBufferPool.dumpLeaks(), serverBufferPool.getLeaks().size(), is(0)));
if (clientBufferPool != null)
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Client leaks: " + clientBufferPool.dumpLeaks(), clientBufferPool.getLeaks().size(), is(0)));
}
finally
{
LifeCycle.stop(httpClient);
LifeCycle.stop(server);
}
}
}

View File

@ -164,6 +164,7 @@ public class PrefaceTest extends AbstractTest
List<ByteBuffer> buffers = accumulator.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
accumulator.release();
Queue<SettingsFrame> settings = new ArrayDeque<>();
AtomicBoolean closed = new AtomicBoolean();

View File

@ -43,6 +43,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
@ -57,6 +58,9 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -66,12 +70,16 @@ public class RawHTTP2ProxyTest
private final List<Server> servers = new ArrayList<>();
private final List<HTTP2Client> clients = new ArrayList<>();
private final List<ArrayByteBufferPool.Tracking> serverBufferPools = new ArrayList<>();
private final List<ArrayByteBufferPool.Tracking> clientBufferPools = new ArrayList<>();
private Server startServer(String name, ServerSessionListener listener) throws Exception
{
QueuedThreadPool serverExecutor = new QueuedThreadPool();
serverExecutor.setName(name);
Server server = new Server(serverExecutor);
ArrayByteBufferPool.Tracking pool = new ArrayByteBufferPool.Tracking();
serverBufferPools.add(pool);
Server server = new Server(serverExecutor, null, pool);
RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener);
ServerConnector connector = new ServerConnector(server, 1, 1, connectionFactory);
server.addConnector(connector);
@ -88,6 +96,9 @@ public class RawHTTP2ProxyTest
clientExecutor.setName(name);
client.setExecutor(clientExecutor);
clients.add(client);
ArrayByteBufferPool.Tracking pool = new ArrayByteBufferPool.Tracking();
clientBufferPools.add(pool);
client.setByteBufferPool(pool);
client.start();
return client;
}
@ -95,6 +106,25 @@ public class RawHTTP2ProxyTest
@AfterEach
public void dispose() throws Exception
{
try
{
for (int i = 0; i < serverBufferPools.size(); i++)
{
ArrayByteBufferPool.Tracking serverBufferPool = serverBufferPools.get(i);
int idx = i;
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Server #" + idx + " leaks: " + serverBufferPool.dumpLeaks(), serverBufferPool.getLeaks().size(), is(0)));
}
for (int i = 0; i < clientBufferPools.size(); i++)
{
ArrayByteBufferPool.Tracking clientBufferPool = clientBufferPools.get(i);
int idx = i;
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Client #" + idx + " leaks: " + clientBufferPool.dumpLeaks(), clientBufferPool.getLeaks().size(), is(0)));
}
}
finally
{
serverBufferPools.clear();
clientBufferPools.clear();
for (int i = clients.size() - 1; i >= 0; i--)
{
HTTP2Client client = clients.get(i);
@ -106,6 +136,7 @@ public class RawHTTP2ProxyTest
server.stop();
}
}
}
@Test
public void testRawHTTP2Proxy() throws Exception

View File

@ -61,8 +61,24 @@ public class RequestTrailersTest extends AbstractTest
MetaData.Response response = new MetaData.Response(HttpStatus.OK_200, null, HttpVersion.HTTP_2, HttpFields.EMPTY);
HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true);
stream.headers(responseFrame, Callback.NOOP);
stream.demand();
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
while (true)
{
Stream.Data data = stream.readData();
if (data != null)
data.release();
if (data == null || !data.frame().isEndStream())
stream.demand();
if (data == null || data.frame().isEndStream())
break;
}
}
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{

View File

@ -324,7 +324,7 @@ public class SettingsTest extends AbstractTest
MetaData.Request push = newRequest("GET", "/push", HttpFields.EMPTY);
PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), 2, push);
session.getGenerator().control(accumulator, pushFrame);
session.getEndPoint().write(Callback.NOOP, accumulator.getByteBuffers().toArray(ByteBuffer[]::new));
session.getEndPoint().write(Callback.from(accumulator::release), accumulator.getByteBuffers().toArray(ByteBuffer[]::new));
return null;
}
catch (HpackException x)

View File

@ -204,7 +204,7 @@ public class StreamCountTest extends AbstractTest
ByteBufferPool.Accumulator accumulator = new ByteBufferPool.Accumulator();
generator.control(accumulator, frame3);
generator.data(accumulator, data3, data3.remaining());
((HTTP2Session)session).getEndPoint().write(Callback.NOOP, accumulator.getByteBuffers().toArray(ByteBuffer[]::new));
((HTTP2Session)session).getEndPoint().write(Callback.from(accumulator::release), accumulator.getByteBuffers().toArray(ByteBuffer[]::new));
// Expect 2 RST_STREAM frames.
assertTrue(sessionResetLatch.await(5, TimeUnit.SECONDS));

View File

@ -84,6 +84,7 @@ import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -417,29 +418,38 @@ public class StreamResetTest extends AbstractTest
@Test
public void testClientResetConsumesQueuedData() throws Exception
{
CountDownLatch dataLatch = new CountDownLatch(1);
start(new Handler.Abstract()
AtomicReference<HTTP2Stream> serverStreamRef = new AtomicReference<>();
start(new ServerSessionListener()
{
@Override
public boolean handle(Request request, Response response, Callback callback) throws Exception
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
// Wait for the data to be sent.
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
callback.succeeded();
return true;
stream.demand();
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
// Do not read the data.
serverStreamRef.set((HTTP2Stream)stream);
}
};
}
});
Session client = newClientSession(new Session.Listener() {});
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
HeadersFrame frame = new HeadersFrame(request, null, false);
FuturePromise<Stream> promise = new FuturePromise<>();
client.newStream(frame, promise, null);
Stream stream = promise.get(5, TimeUnit.SECONDS);
Stream stream = client.newStream(frame, null).get(5, TimeUnit.SECONDS);
ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
stream.data(new DataFrame(stream.getId(), data, false), Callback.from(dataLatch::countDown));
// The server does not read the data, so the flow control window should be zero.
assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
stream.data(new DataFrame(stream.getId(), data, false), Callback.NOOP);
// Wait for the server to receive all the data.
await().atMost(5, TimeUnit.SECONDS).until(() -> serverStreamRef.get() != null);
HTTP2Stream serverStream = serverStreamRef.get();
await().atMost(5, TimeUnit.SECONDS).until(() -> serverStream.getDataLength() == FlowControlStrategy.DEFAULT_WINDOW_SIZE);
// The server does not read the data, so the client flow control window should be zero.
assertEquals(0, ((HTTP2Session)client).updateSendWindow(0));
// Now reset the stream.
@ -913,6 +923,7 @@ public class StreamResetTest extends AbstractTest
generator.control(accumulator, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code));
buffers = accumulator.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
accumulator.release();
assertTrue(writeLatch1.await(5, TimeUnit.SECONDS));
assertTrue(writeLatch2.await(5, TimeUnit.SECONDS));
@ -1011,6 +1022,7 @@ public class StreamResetTest extends AbstractTest
generator.control(accumulator, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code));
buffers = accumulator.getByteBuffers();
socket.write(buffers.toArray(new ByteBuffer[0]));
accumulator.release();
// Wait to be sure that the server processed the reset.
Thread.sleep(1000);
// Let the request write, it should not block.
@ -1076,6 +1088,76 @@ public class StreamResetTest extends AbstractTest
assertFalse(failureLatch.await(1, TimeUnit.SECONDS));
}
@Test
public void testStreamResetDrainsData() throws Exception
{
AtomicReference<HTTP2Stream> serverStreamRef = new AtomicReference<>();
start(new ServerSessionListener()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
serverStreamRef.set((HTTP2Stream)stream);
stream.demand();
return new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
// Do not read DATA frames.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code));
}
};
}
});
CountDownLatch resetLatch = new CountDownLatch(1);
Session client = newClientSession(new Session.Listener() {});
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(request, null, false);
Stream stream = client.newStream(requestFrame, new Stream.Listener()
{
@Override
public void onReset(Stream stream, ResetFrame frame, Callback callback)
{
resetLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true));
assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
// After the reset, the server stream should be drained.
assertEquals(0, serverStreamRef.get().getDataLength());
}
@Test
public void testDataAfterLastFrameResets() throws Exception
{
start(new ServerSessionListener() {});
CountDownLatch resetLatch = new CountDownLatch(1);
Session client = newClientSession(new Session.Listener() {});
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
HeadersFrame requestFrame = new HeadersFrame(request, null, true);
Stream stream = client.newStream(requestFrame, new Stream.Listener()
{
@Override
public void onReset(Stream stream, ResetFrame frame, Callback callback)
{
resetLatch.countDown();
}
}).get(5, TimeUnit.SECONDS);
// The HEADERS frame had endStream=true, send a DATA frame with endStream=true, expect RST_STREAM.
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE), true));
assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
// The client session window should be open.
await().atMost(5, TimeUnit.SECONDS).until(() -> ((HTTP2Session)stream.getSession()).updateSendWindow(0), greaterThan(0));
}
private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException
{
long start = NanoTime.now();

View File

@ -631,6 +631,7 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
private final Throwable acquireStack;
private final List<Throwable> retainStacks = new CopyOnWriteArrayList<>();
private final List<Throwable> releaseStacks = new CopyOnWriteArrayList<>();
private final List<Throwable> overReleaseStacks = new CopyOnWriteArrayList<>();
private Buffer(RetainableByteBuffer wrapped, int size)
{
@ -664,6 +665,8 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
@Override
public boolean release()
{
try
{
boolean released = super.release();
if (released)
@ -675,6 +678,13 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
releaseStacks.add(new Throwable());
return released;
}
catch (IllegalStateException e)
{
buffers.add(this);
overReleaseStacks.add(new Throwable());
throw e;
}
}
public String dump()
{
@ -691,6 +701,11 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
{
releaseStack.printStackTrace(pw);
}
pw.println("\n" + overReleaseStacks.size() + " over-release(s)");
for (Throwable overReleaseStack : overReleaseStacks)
{
overReleaseStack.printStackTrace(pw);
}
return "%s@%x of %d bytes on %s wrapping %s acquired at %s".formatted(getClass().getSimpleName(), hashCode(), getSize(), getAcquireInstant(), getWrapped(), w);
}
}

View File

@ -55,7 +55,6 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -285,10 +284,6 @@ public class HttpClientDemandTest extends AbstractTest
// Demand once more to trigger response success.
demanderRef.get().run();
assertTrue(resultLatch.await(5, TimeUnit.SECONDS));
// Make sure the chunks were not leaked.
assertThrows(IllegalStateException.class, c1::release);
assertThrows(IllegalStateException.class, c2::release);
}
private static String asStringAndRelease(Content.Chunk chunk)