From 53de4c82984e143284405704110c2bbaa6aba715 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 4 Oct 2023 14:47:26 +0200 Subject: [PATCH] Fixes #10611 - Flaky StreamResetTest.testClientResetConsumesQueuedData() (#10655) Fixed test case that was racy. When the DATA frames arrived at the server before the call to consumeAvailable(), they were read and the client flow control window re-opened. If it happened that the DATA frames arrived at the server after the call to consumeAvailable(), the client flow control window was not re-opened, making the test flaky. Fixed by avoiding the race in the test. Added over-release buffer tracking, add leak tracking to H2 tests, fix client leaks in tests. Also reviewed the places that required re-opening of the flow control window in case the DATA frames are not read. Signed-off-by: Simone Bordet Signed-off-by: Ludovic Orban Co-authored-by: Ludovic Orban --- .../org/eclipse/jetty/http2/HTTP2Stream.java | 62 ++++++++-- .../jetty/http2/tests/AbstractTest.java | 26 ++++- .../jetty/http2/tests/PrefaceTest.java | 1 + .../jetty/http2/tests/RawHTTP2ProxyTest.java | 45 ++++++-- .../http2/tests/RequestTrailersTest.java | 16 +++ .../jetty/http2/tests/SettingsTest.java | 2 +- .../jetty/http2/tests/StreamCountTest.java | 2 +- .../jetty/http2/tests/StreamResetTest.java | 108 +++++++++++++++--- .../eclipse/jetty/io/ArrayByteBufferPool.java | 29 +++-- .../transport/HttpClientDemandTest.java | 5 - 10 files changed, 251 insertions(+), 45 deletions(-) diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index e17b2cc305b..feda6c9d9be 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -162,6 +162,7 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum @Override public void reset(ResetFrame frame, Callback callback) { + int flowControlLength; Throwable resetFailure = null; try (AutoLock ignored = lock.lock()) { @@ -174,7 +175,9 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum localReset = true; failure = new EOFException("reset"); } + flowControlLength = drain(); } + session.dataConsumed(this, flowControlLength); if (resetFailure != null) callback.failed(resetFailure); else @@ -340,6 +343,8 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum public void setListener(Listener listener) { this.listener = listener; + if (listener == null) + demand(); } public void process(Frame frame, Callback callback) @@ -418,11 +423,14 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum private void onData(Data data) { + DataFrame frame = data.frame(); + // SPEC: remotely closed streams must be replied with a reset. if (isRemotelyClosed()) { if (LOG.isDebugEnabled()) LOG.debug("Data {} for already closed {}", data, this); + session.dataConsumed(this, data.frame().flowControlLength()); reset(new ResetFrame(streamId, ErrorCode.STREAM_CLOSED_ERROR.code), Callback.NOOP); return; } @@ -432,28 +440,25 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum // Just drop the frame. if (LOG.isDebugEnabled()) LOG.debug("Data {} for already reset {}", data, this); + session.dataConsumed(this, data.frame().flowControlLength()); return; } if (dataLength >= 0) { - DataFrame frame = data.frame(); dataLength -= frame.remaining(); if (dataLength < 0 || (frame.isEndStream() && dataLength != 0)) { if (LOG.isDebugEnabled()) LOG.debug("Invalid data length {} for {}", data, this); + session.dataConsumed(this, data.frame().flowControlLength()); reset(new ResetFrame(streamId, ErrorCode.PROTOCOL_ERROR.code), Callback.NOOP); return; } } - boolean listenerPresent = getListener() != null; - boolean endStream = data.frame().isEndStream(); - if ((listenerPresent || endStream) && offer(data)) + if (offer(data)) processData(); - if (!listenerPresent && updateClose(endStream, CloseState.Event.RECEIVED)) - session.removeStream(this); } private boolean offer(Data data) @@ -555,15 +560,29 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum } } + public long getDataLength() + { + try (AutoLock ignored = lock.lock()) + { + return dataQueue.stream() + .mapToLong(data -> data.frame().remaining()) + .sum(); + } + } + private void onReset(ResetFrame frame, Callback callback) { + int flowControlLength; try (AutoLock ignored = lock.lock()) { remoteReset = true; failure = new EofException("reset"); + flowControlLength = drain(); } close(); - if (session.removeStream(this)) + boolean removed = session.removeStream(this); + session.dataConsumed(this, flowControlLength); + if (removed) notifyReset(this, frame, callback); else callback.succeeded(); @@ -584,17 +603,44 @@ public class HTTP2Stream implements Stream, Attachable, Closeable, Callback, Dum private void onFailure(FailureFrame frame, Callback callback) { + int flowControlLength; try (AutoLock ignored = lock.lock()) { failure = frame.getFailure(); + flowControlLength = drain(); } close(); - if (session.removeStream(this)) + boolean removed = session.removeStream(this); + session.dataConsumed(this, flowControlLength); + if (removed) notifyFailure(this, frame, callback); else callback.succeeded(); } + private int drain() + { + assert lock.isHeldByCurrentThread(); + int length = 0; + while (true) + { + Data data = dataQueue.poll(); + if (data == null) + break; + data.release(); + DataFrame frame = data.frame(); + length += frame.flowControlLength(); + if (frame.isEndStream()) + { + dataQueue.offer(Data.eof(getId())); + break; + } + } + if (LOG.isDebugEnabled()) + LOG.debug("Drained {} bytes for {}", length, this); + return length; + } + public boolean updateClose(boolean update, CloseState.Event event) { if (LOG.isDebugEnabled()) diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/AbstractTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/AbstractTest.java index 36bcfbcdeb0..90bc346b4ce 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/AbstractTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/AbstractTest.java @@ -31,6 +31,7 @@ import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2; import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.Handler; @@ -42,12 +43,18 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + public class AbstractTest { protected Server server; protected ServerConnector connector; protected HTTP2Client http2Client; protected HttpClient httpClient; + private ArrayByteBufferPool.Tracking serverBufferPool; + private ArrayByteBufferPool.Tracking clientBufferPool; protected void start(Handler handler) throws Exception { @@ -84,7 +91,8 @@ public class AbstractTest { QueuedThreadPool serverExecutor = new QueuedThreadPool(); serverExecutor.setName("server"); - server = new Server(serverExecutor); + serverBufferPool = new ArrayByteBufferPool.Tracking(); + server = new Server(serverExecutor, null, serverBufferPool); connector = new ServerConnector(server, 1, 1, connectionFactories); server.addConnector(connector); } @@ -92,6 +100,8 @@ public class AbstractTest protected void prepareClient() { ClientConnector connector = new ClientConnector(); + clientBufferPool = new ArrayByteBufferPool.Tracking(); + connector.setByteBufferPool(clientBufferPool); QueuedThreadPool clientExecutor = new QueuedThreadPool(); clientExecutor.setName("client"); connector.setExecutor(clientExecutor); @@ -128,7 +138,17 @@ public class AbstractTest @AfterEach public void dispose() throws Exception { - LifeCycle.stop(httpClient); - LifeCycle.stop(server); + try + { + if (serverBufferPool != null) + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Server leaks: " + serverBufferPool.dumpLeaks(), serverBufferPool.getLeaks().size(), is(0))); + if (clientBufferPool != null) + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Client leaks: " + clientBufferPool.dumpLeaks(), clientBufferPool.getLeaks().size(), is(0))); + } + finally + { + LifeCycle.stop(httpClient); + LifeCycle.stop(server); + } } } diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/PrefaceTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/PrefaceTest.java index 1a16a298bd0..48c82d4bc53 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/PrefaceTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/PrefaceTest.java @@ -164,6 +164,7 @@ public class PrefaceTest extends AbstractTest List buffers = accumulator.getByteBuffers(); socket.write(buffers.toArray(new ByteBuffer[0])); + accumulator.release(); Queue settings = new ArrayDeque<>(); AtomicBoolean closed = new AtomicBoolean(); diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java index e8db27522da..2df4ab10761 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RawHTTP2ProxyTest.java @@ -43,6 +43,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.io.ArrayByteBufferPool; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; @@ -57,6 +58,9 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -66,12 +70,16 @@ public class RawHTTP2ProxyTest private final List servers = new ArrayList<>(); private final List clients = new ArrayList<>(); + private final List serverBufferPools = new ArrayList<>(); + private final List clientBufferPools = new ArrayList<>(); private Server startServer(String name, ServerSessionListener listener) throws Exception { QueuedThreadPool serverExecutor = new QueuedThreadPool(); serverExecutor.setName(name); - Server server = new Server(serverExecutor); + ArrayByteBufferPool.Tracking pool = new ArrayByteBufferPool.Tracking(); + serverBufferPools.add(pool); + Server server = new Server(serverExecutor, null, pool); RawHTTP2ServerConnectionFactory connectionFactory = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener); ServerConnector connector = new ServerConnector(server, 1, 1, connectionFactory); server.addConnector(connector); @@ -88,6 +96,9 @@ public class RawHTTP2ProxyTest clientExecutor.setName(name); client.setExecutor(clientExecutor); clients.add(client); + ArrayByteBufferPool.Tracking pool = new ArrayByteBufferPool.Tracking(); + clientBufferPools.add(pool); + client.setByteBufferPool(pool); client.start(); return client; } @@ -95,15 +106,35 @@ public class RawHTTP2ProxyTest @AfterEach public void dispose() throws Exception { - for (int i = clients.size() - 1; i >= 0; i--) + try { - HTTP2Client client = clients.get(i); - client.stop(); + for (int i = 0; i < serverBufferPools.size(); i++) + { + ArrayByteBufferPool.Tracking serverBufferPool = serverBufferPools.get(i); + int idx = i; + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Server #" + idx + " leaks: " + serverBufferPool.dumpLeaks(), serverBufferPool.getLeaks().size(), is(0))); + } + for (int i = 0; i < clientBufferPools.size(); i++) + { + ArrayByteBufferPool.Tracking clientBufferPool = clientBufferPools.get(i); + int idx = i; + await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> assertThat("Client #" + idx + " leaks: " + clientBufferPool.dumpLeaks(), clientBufferPool.getLeaks().size(), is(0))); + } } - for (int i = servers.size() - 1; i >= 0; i--) + finally { - Server server = servers.get(i); - server.stop(); + serverBufferPools.clear(); + clientBufferPools.clear(); + for (int i = clients.size() - 1; i >= 0; i--) + { + HTTP2Client client = clients.get(i); + client.stop(); + } + for (int i = servers.size() - 1; i >= 0; i--) + { + Server server = servers.get(i); + server.stop(); + } } } diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RequestTrailersTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RequestTrailersTest.java index f1e79da4fec..6ddff4571e5 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RequestTrailersTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/RequestTrailersTest.java @@ -61,8 +61,24 @@ public class RequestTrailersTest extends AbstractTest MetaData.Response response = new MetaData.Response(HttpStatus.OK_200, null, HttpVersion.HTTP_2, HttpFields.EMPTY); HeadersFrame responseFrame = new HeadersFrame(stream.getId(), response, null, true); stream.headers(responseFrame, Callback.NOOP); + stream.demand(); return new Stream.Listener() { + @Override + public void onDataAvailable(Stream stream) + { + while (true) + { + Stream.Data data = stream.readData(); + if (data != null) + data.release(); + if (data == null || !data.frame().isEndStream()) + stream.demand(); + if (data == null || data.frame().isEndStream()) + break; + } + } + @Override public void onHeaders(Stream stream, HeadersFrame frame) { diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/SettingsTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/SettingsTest.java index 6d0d7383f60..8a1033b957c 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/SettingsTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/SettingsTest.java @@ -324,7 +324,7 @@ public class SettingsTest extends AbstractTest MetaData.Request push = newRequest("GET", "/push", HttpFields.EMPTY); PushPromiseFrame pushFrame = new PushPromiseFrame(stream.getId(), 2, push); session.getGenerator().control(accumulator, pushFrame); - session.getEndPoint().write(Callback.NOOP, accumulator.getByteBuffers().toArray(ByteBuffer[]::new)); + session.getEndPoint().write(Callback.from(accumulator::release), accumulator.getByteBuffers().toArray(ByteBuffer[]::new)); return null; } catch (HpackException x) diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamCountTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamCountTest.java index 5dfc84b525b..510da57b82d 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamCountTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamCountTest.java @@ -204,7 +204,7 @@ public class StreamCountTest extends AbstractTest ByteBufferPool.Accumulator accumulator = new ByteBufferPool.Accumulator(); generator.control(accumulator, frame3); generator.data(accumulator, data3, data3.remaining()); - ((HTTP2Session)session).getEndPoint().write(Callback.NOOP, accumulator.getByteBuffers().toArray(ByteBuffer[]::new)); + ((HTTP2Session)session).getEndPoint().write(Callback.from(accumulator::release), accumulator.getByteBuffers().toArray(ByteBuffer[]::new)); // Expect 2 RST_STREAM frames. assertTrue(sessionResetLatch.await(5, TimeUnit.SECONDS)); diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamResetTest.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamResetTest.java index 15e17a05b81..a5de54689e0 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamResetTest.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/StreamResetTest.java @@ -84,6 +84,7 @@ import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -417,29 +418,38 @@ public class StreamResetTest extends AbstractTest @Test public void testClientResetConsumesQueuedData() throws Exception { - CountDownLatch dataLatch = new CountDownLatch(1); - start(new Handler.Abstract() + AtomicReference serverStreamRef = new AtomicReference<>(); + start(new ServerSessionListener() { @Override - public boolean handle(Request request, Response response, Callback callback) throws Exception + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) { - // Wait for the data to be sent. - assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); - callback.succeeded(); - return true; + stream.demand(); + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + // Do not read the data. + serverStreamRef.set((HTTP2Stream)stream); + } + }; } }); Session client = newClientSession(new Session.Listener() {}); MetaData.Request request = newRequest("GET", HttpFields.EMPTY); HeadersFrame frame = new HeadersFrame(request, null, false); - FuturePromise promise = new FuturePromise<>(); - client.newStream(frame, promise, null); - Stream stream = promise.get(5, TimeUnit.SECONDS); + Stream stream = client.newStream(frame, null).get(5, TimeUnit.SECONDS); ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE); - stream.data(new DataFrame(stream.getId(), data, false), Callback.from(dataLatch::countDown)); - // The server does not read the data, so the flow control window should be zero. - assertTrue(dataLatch.await(5, TimeUnit.SECONDS)); + stream.data(new DataFrame(stream.getId(), data, false), Callback.NOOP); + + // Wait for the server to receive all the data. + await().atMost(5, TimeUnit.SECONDS).until(() -> serverStreamRef.get() != null); + HTTP2Stream serverStream = serverStreamRef.get(); + await().atMost(5, TimeUnit.SECONDS).until(() -> serverStream.getDataLength() == FlowControlStrategy.DEFAULT_WINDOW_SIZE); + + // The server does not read the data, so the client flow control window should be zero. assertEquals(0, ((HTTP2Session)client).updateSendWindow(0)); // Now reset the stream. @@ -913,6 +923,7 @@ public class StreamResetTest extends AbstractTest generator.control(accumulator, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code)); buffers = accumulator.getByteBuffers(); socket.write(buffers.toArray(new ByteBuffer[0])); + accumulator.release(); assertTrue(writeLatch1.await(5, TimeUnit.SECONDS)); assertTrue(writeLatch2.await(5, TimeUnit.SECONDS)); @@ -1011,6 +1022,7 @@ public class StreamResetTest extends AbstractTest generator.control(accumulator, new ResetFrame(streamId, ErrorCode.CANCEL_STREAM_ERROR.code)); buffers = accumulator.getByteBuffers(); socket.write(buffers.toArray(new ByteBuffer[0])); + accumulator.release(); // Wait to be sure that the server processed the reset. Thread.sleep(1000); // Let the request write, it should not block. @@ -1076,6 +1088,76 @@ public class StreamResetTest extends AbstractTest assertFalse(failureLatch.await(1, TimeUnit.SECONDS)); } + @Test + public void testStreamResetDrainsData() throws Exception + { + AtomicReference serverStreamRef = new AtomicReference<>(); + start(new ServerSessionListener() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + serverStreamRef.set((HTTP2Stream)stream); + stream.demand(); + return new Stream.Listener() + { + @Override + public void onDataAvailable(Stream stream) + { + // Do not read DATA frames. + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code)); + } + }; + } + }); + + CountDownLatch resetLatch = new CountDownLatch(1); + Session client = newClientSession(new Session.Listener() {}); + MetaData.Request request = newRequest("GET", HttpFields.EMPTY); + HeadersFrame requestFrame = new HeadersFrame(request, null, false); + Stream stream = client.newStream(requestFrame, new Stream.Listener() + { + @Override + public void onReset(Stream stream, ResetFrame frame, Callback callback) + { + resetLatch.countDown(); + } + }).get(5, TimeUnit.SECONDS); + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(1024), true)); + + assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + + // After the reset, the server stream should be drained. + assertEquals(0, serverStreamRef.get().getDataLength()); + } + + @Test + public void testDataAfterLastFrameResets() throws Exception + { + start(new ServerSessionListener() {}); + + CountDownLatch resetLatch = new CountDownLatch(1); + Session client = newClientSession(new Session.Listener() {}); + MetaData.Request request = newRequest("GET", HttpFields.EMPTY); + HeadersFrame requestFrame = new HeadersFrame(request, null, true); + Stream stream = client.newStream(requestFrame, new Stream.Listener() + { + @Override + public void onReset(Stream stream, ResetFrame frame, Callback callback) + { + resetLatch.countDown(); + } + }).get(5, TimeUnit.SECONDS); + + // The HEADERS frame had endStream=true, send a DATA frame with endStream=true, expect RST_STREAM. + stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE), true)); + + assertTrue(resetLatch.await(5, TimeUnit.SECONDS)); + + // The client session window should be open. + await().atMost(5, TimeUnit.SECONDS).until(() -> ((HTTP2Session)stream.getSession()).updateSendWindow(0), greaterThan(0)); + } + private void waitUntilTCPCongested(WriteFlusher flusher) throws TimeoutException, InterruptedException { long start = NanoTime.now(); diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index d5a9fae126e..46a1c73eb73 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -631,6 +631,7 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable private final Throwable acquireStack; private final List retainStacks = new CopyOnWriteArrayList<>(); private final List releaseStacks = new CopyOnWriteArrayList<>(); + private final List overReleaseStacks = new CopyOnWriteArrayList<>(); private Buffer(RetainableByteBuffer wrapped, int size) { @@ -665,15 +666,24 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable @Override public boolean release() { - boolean released = super.release(); - if (released) + try { - buffers.remove(this); - if (LOG.isDebugEnabled()) - LOG.debug("released {}", this); + boolean released = super.release(); + if (released) + { + buffers.remove(this); + if (LOG.isDebugEnabled()) + LOG.debug("released {}", this); + } + releaseStacks.add(new Throwable()); + return released; + } + catch (IllegalStateException e) + { + buffers.add(this); + overReleaseStacks.add(new Throwable()); + throw e; } - releaseStacks.add(new Throwable()); - return released; } public String dump() @@ -691,6 +701,11 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable { releaseStack.printStackTrace(pw); } + pw.println("\n" + overReleaseStacks.size() + " over-release(s)"); + for (Throwable overReleaseStack : overReleaseStacks) + { + overReleaseStack.printStackTrace(pw); + } return "%s@%x of %d bytes on %s wrapping %s acquired at %s".formatted(getClass().getSimpleName(), hashCode(), getSize(), getAcquireInstant(), getWrapped(), w); } } diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java index 5c181775d88..2403239119f 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java @@ -55,7 +55,6 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -285,10 +284,6 @@ public class HttpClientDemandTest extends AbstractTest // Demand once more to trigger response success. demanderRef.get().run(); assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); - - // Make sure the chunks were not leaked. - assertThrows(IllegalStateException.class, c1::release); - assertThrows(IllegalStateException.class, c2::release); } private static String asStringAndRelease(Content.Chunk chunk)